Consumer.cpp

Go to the documentation of this file.
00001 // Consumer.cpp,v 1.19 2006/03/14 06:14:34 jtc Exp
00002 
00003 #include "orbsvcs/Notify/Consumer.h"
00004 
00005 #if ! defined (__ACE_INLINE__)
00006 #include "orbsvcs/Notify/Consumer.inl"
00007 #endif /* __ACE_INLINE__ */
00008 
00009 ACE_RCSID (RT_Notify, TAO_Notify_Consumer, "Consumer.cpp,v 1.19 2006/03/14 06:14:34 jtc Exp")
00010 
00011 #include "orbsvcs/Notify/Timer.h"
00012 #include "orbsvcs/Notify/ProxySupplier.h"
00013 #include "orbsvcs/Notify/Method_Request_Event.h"
00014 
00015 #include "orbsvcs/Time_Utilities.h"
00016 
00017 #include "tao/debug.h"
00018 #include "tao/corba.h"
00019 
00020 #include "ace/Bound_Ptr.h"
00021 #include "ace/Unbounded_Queue.h"
00022 
00023 #ifndef DEBUG_LEVEL
00024 # define DEBUG_LEVEL TAO_debug_level
00025 #endif //DEBUG_LEVEL
00026 
00027 static const int DEFAULT_RETRY_TIMEOUT = 10;//120; // Note : This should be a config param or qos setting
00028 
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030 
00031 TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy)
00032 : proxy_ (proxy)
00033 , is_suspended_ (0)
00034 , pacing_ (proxy->qos_properties_.pacing_interval ())
00035 , max_batch_size_ (CosNotification::MaximumBatchSize, 0)
00036 , timer_id_ (-1)
00037 , timer_ (0)
00038 {
00039   Request_Queue* pending_events = 0;
00040   ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ());
00041   this->pending_events_.reset( pending_events );
00042 
00043   this->timer_.reset( this->proxy ()->timer () );
00044 }
00045 
00046 TAO_Notify_Consumer::~TAO_Notify_Consumer ()
00047 {
00048   if (this->timer_.isSet())
00049   {
00050     this->cancel_timer ();
00051     this->timer_.reset ();
00052   }
00053 }
00054 
00055 TAO_Notify_Proxy*
00056 TAO_Notify_Consumer::proxy (void)
00057 {
00058   return this->proxy_supplier ();
00059 }
00060 
00061 void
00062 TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
00063 {
00064   this->max_batch_size_ = qos_properties.maximum_batch_size ();
00065 }
00066 
00067 void
00068 TAO_Notify_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL)
00069 {
00070   this->is_suspended_ = 0;
00071 
00072   this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00073 }
00074 
00075 void
00076 TAO_Notify_Consumer::enqueue_request (
00077   TAO_Notify_Method_Request_Event * request
00078   ACE_ENV_ARG_DECL)
00079 {
00080   TAO_Notify_Event::Ptr event (
00081     request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00082   ACE_CHECK;
00083 
00084   TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00085   ACE_NEW_THROW_EX (queue_entry,
00086     TAO_Notify_Method_Request_Event_Queueable (*request, event),
00087     CORBA::NO_MEMORY ());
00088   ACE_CHECK;
00089 
00090   if (DEBUG_LEVEL  > 3) ACE_DEBUG ( (LM_DEBUG,
00091     ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
00092     static_cast<int> (this->proxy ()->id ()),
00093     request->sequence (),
00094     request
00095     ));
00096   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00097   this->pending_events().enqueue_tail (queue_entry);
00098 }
00099 
00100 bool
00101 TAO_Notify_Consumer::enqueue_if_necessary (TAO_Notify_Method_Request_Event * request ACE_ENV_ARG_DECL)
00102 {
00103   ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
00104   if (! this->pending_events().is_empty ())
00105     {
00106       if (DEBUG_LEVEL > 3)
00107         ACE_DEBUG ((LM_DEBUG,
00108                     ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
00109                     static_cast<int> (this->proxy ()->id ()),
00110                     request->sequence ()
00111                     ));
00112       TAO_Notify_Event::Ptr event (
00113         request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00114       ACE_CHECK_RETURN (false);
00115       TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00116       ACE_NEW_THROW_EX (queue_entry,
00117                         TAO_Notify_Method_Request_Event_Queueable (*request,
00118                                                                    event),
00119                         CORBA::NO_MEMORY ());
00120       ACE_CHECK_RETURN (false);
00121       this->pending_events().enqueue_tail (queue_entry);
00122       this->schedule_timer (false);
00123       return true;
00124     }
00125   if (this->is_suspended_ == 1)
00126     {
00127       if (DEBUG_LEVEL > 3)
00128         ACE_DEBUG ((LM_DEBUG,
00129                     ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
00130                     static_cast<int> (this->proxy ()->id ()),
00131                     request->sequence ()
00132                     ));
00133       TAO_Notify_Event::Ptr event (
00134         request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00135       ACE_CHECK_RETURN (false);
00136       TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00137       ACE_NEW_THROW_EX (queue_entry,
00138                         TAO_Notify_Method_Request_Event_Queueable (*request,
00139                                                                    event),
00140                         CORBA::NO_MEMORY ());
00141       ACE_CHECK_RETURN (false);
00142       this->pending_events().enqueue_tail (queue_entry);
00143       this->schedule_timer (false);
00144       return true;
00145     }
00146   return false;
00147 }
00148 
00149 void
00150 TAO_Notify_Consumer::deliver (TAO_Notify_Method_Request_Event * request
00151                               ACE_ENV_ARG_DECL)
00152 {
00153   // Increment reference counts (safely) to prevent this object and its proxy
00154   // from being deleted while the push is in progress.
00155   TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ());
00156   bool queued = enqueue_if_necessary (request ACE_ENV_ARG_PARAMETER);
00157   ACE_CHECK;
00158   if (!queued)
00159     {
00160       DispatchStatus status = this->dispatch_request (request);
00161       switch (status)
00162         {
00163         case DISPATCH_SUCCESS:
00164           {
00165             request->complete ();
00166             break;
00167           }
00168         case DISPATCH_RETRY:
00169           {
00170             if (DEBUG_LEVEL > 1)
00171               ACE_DEBUG ((LM_DEBUG,
00172                           ACE_TEXT ("Consumer %d enqueing event %d due ")
00173                           ACE_TEXT ("to failed dispatch.\n"),
00174                           static_cast<int> (this->proxy ()->id ()),
00175                           request->sequence ()));
00176             this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
00177             ACE_CHECK;
00178             this->schedule_timer (true);
00179             break;
00180           }
00181         case DISPATCH_DISCARD:
00182           {
00183             if (DEBUG_LEVEL  > 0)
00184               ACE_DEBUG ((LM_DEBUG,
00185                           ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00186                                     "direct dispatch. Discarding event:%d.\n"),
00187                           static_cast<int> (this->proxy ()->id ()),
00188                           request->sequence ()
00189                           ));
00190             request->complete ();
00191             break;
00192           }
00193         case DISPATCH_FAIL:
00194           {
00195             if (DEBUG_LEVEL  > 0)
00196               ACE_DEBUG ((LM_DEBUG,
00197                           ACE_TEXT ("(%P|%t) Consumer %d: Failed during "
00198                                     "direct dispatch :%d. Discarding event.\n"),
00199                           static_cast<int> (this->proxy ()->id ()),
00200                           request->sequence ()
00201                           ));
00202             request->complete ();
00203             ACE_DECLARE_NEW_ENV;
00204             ACE_TRY
00205               {
00206                 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00207                 ACE_TRY_CHECK;
00208               }
00209             ACE_CATCHANY
00210               {
00211                 // todo is there something meaningful we can do here?
00212                 ;
00213               }
00214             ACE_ENDTRY;
00215             break;
00216           }
00217         }
00218     }
00219 }
00220 
00221 TAO_Notify_Consumer::DispatchStatus
00222 TAO_Notify_Consumer::dispatch_request (TAO_Notify_Method_Request_Event * request)
00223 {
00224   DispatchStatus result = DISPATCH_SUCCESS;
00225   ACE_DECLARE_NEW_ENV;
00226   ACE_TRY
00227     {
00228       request->event ()->push (this ACE_ENV_ARG_PARAMETER);
00229       ACE_TRY_CHECK;
00230       if (DEBUG_LEVEL  > 8)
00231         ACE_DEBUG ((LM_DEBUG,
00232                     ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
00233                     static_cast<int> (this->proxy ()->id ()),
00234                     request->sequence ()
00235                     ));
00236     }
00237   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00238     {
00239       if (DEBUG_LEVEL  > 0)
00240         {
00241           ACE_DEBUG ((LM_ERROR,
00242                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00243                                 "(request) %s\n"),
00244                       static_cast<int> (this->proxy ()->id ()),
00245                       ex._info ().c_str ()
00246                       ));
00247         }
00248       result = DISPATCH_FAIL;
00249     }
00250   ACE_CATCH (CORBA::TRANSIENT, ex)
00251     {
00252       if (DEBUG_LEVEL  > 0)
00253         ACE_DEBUG ((LM_ERROR,
00254                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00255                               "(request) Transient (minor=%d) %s\n"),
00256                     static_cast<int> (this->proxy ()->id ()),
00257                     ex.minor (),
00258                     ex._info ().c_str ()
00259                     ));
00260       const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00261       switch (ex.minor () & 0xfffff000u)
00262         {
00263         case CORBA::OMGVMCID:
00264           switch (ex.minor () & 0x00000fffu)
00265             {
00266             case 2: // No usable profile
00267             case 3: // Request cancelled
00268             case 4: // POA destroyed
00269               result = DISPATCH_FAIL;
00270               break;
00271             default:
00272               result = DISPATCH_DISCARD;
00273             }
00274           break;
00275 
00276         case TAO::VMCID:
00277         default:
00278           switch (ex.minor () & BITS_5_THRU_12_MASK)
00279             {
00280             case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00281               result = DISPATCH_FAIL;
00282               break;
00283             case TAO_POA_DISCARDING:
00284             case TAO_POA_HOLDING:
00285             default:
00286               result = DISPATCH_RETRY;
00287             } break;
00288         }
00289     }
00290   ACE_CATCH (CORBA::TIMEOUT, ex)
00291     {
00292       if (DEBUG_LEVEL  > 0)
00293         ACE_DEBUG ((LM_ERROR,
00294                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00295                               "(request) %s\n"),
00296                     this->proxy ()->id (),
00297                     ex._info().c_str ()
00298                     ));
00299       result = DISPATCH_FAIL;
00300     }
00301   ACE_CATCH (CORBA::COMM_FAILURE, ex)
00302     {
00303       if (DEBUG_LEVEL  > 0)
00304         ACE_DEBUG ((LM_ERROR,
00305                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00306                               "(request) %s\n"),
00307                     this->proxy ()->id (),
00308                     ex._info().c_str ()
00309                     ));
00310       result = DISPATCH_FAIL;
00311     }
00312   ACE_CATCH (CORBA::SystemException, ex)
00313     {
00314       if (DEBUG_LEVEL  > 0)
00315         {
00316           ACE_DEBUG ((LM_ERROR,
00317                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00318                                 "(request) SystemException %s\n"),
00319                       static_cast<int> (this->proxy ()->id ()),
00320                       ex._info ().c_str ()
00321                       ));
00322         }
00323       result = DISPATCH_DISCARD;
00324     }
00325   ACE_CATCHANY
00326     {
00327       ACE_ERROR ( (LM_ERROR,
00328                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00329                              "(request) Caught unexpected exception "
00330                              "pushing event to consumer.\n"),
00331                    static_cast<int> (this->proxy ()->id ())
00332                    ));
00333       result = DISPATCH_DISCARD;
00334     }
00335   ACE_ENDTRY;
00336 
00337   // for persistent events that haven't timed out
00338   // convert "FAIL" & "DISCARD" to "RETRY"
00339   // for transient events, convert RETRY to DISCARD (hey, best_effort.)
00340   if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD)
00341     {
00342       if (request->should_retry ())
00343         {
00344           result = DISPATCH_RETRY;
00345         }
00346     }
00347   else if (result == DISPATCH_RETRY)
00348     {
00349       if (! request->should_retry ())
00350         {
00351           result = DISPATCH_DISCARD;
00352         }
00353     }
00354 
00355   return result;
00356 }
00357 
00358 TAO_Notify_Consumer::DispatchStatus
00359 TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch)
00360 {
00361   DispatchStatus result = DISPATCH_SUCCESS;
00362   ACE_DECLARE_NEW_ENV;
00363   ACE_TRY
00364     {
00365       this->push (batch ACE_ENV_ARG_PARAMETER);
00366       ACE_TRY_CHECK;
00367     }
00368   ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00369     {
00370       if (DEBUG_LEVEL  > 0)
00371         ACE_DEBUG ((LM_ERROR,
00372                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00373                               "%d::dispatch_batch() %s\n"),
00374                     static_cast<int> (this->proxy ()->id ()),
00375                     ex._info ().c_str ()
00376                     ));
00377       result = DISPATCH_FAIL;
00378     }
00379   ACE_CATCH (CORBA::TRANSIENT, ex)
00380     {
00381       if (DEBUG_LEVEL  > 0)
00382         ACE_DEBUG ((LM_ERROR,
00383                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00384                               "%d::dispatch_batch() Transient (minor=%d) %s\n"),
00385                     static_cast<int> (this->proxy ()->id ()),
00386                     ex.minor (),
00387                     ex._info ().c_str ()
00388                     ));
00389       const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00390       switch (ex.minor () & 0xfffff000u)
00391         {
00392         case CORBA::OMGVMCID:
00393           switch (ex.minor () & 0x00000fffu)
00394             {
00395             case 2: // No usable profile
00396             case 3: // Request cancelled
00397             case 4: // POA destroyed
00398               result = DISPATCH_FAIL;
00399               break;
00400             default:
00401               result = DISPATCH_DISCARD;
00402             }
00403           break;
00404 
00405         case TAO::VMCID:
00406         default:
00407           switch (ex.minor () & BITS_5_THRU_12_MASK)
00408             {
00409             case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00410               result = DISPATCH_FAIL;
00411               break;
00412             case TAO_POA_DISCARDING:
00413             case TAO_POA_HOLDING:
00414             default:
00415               result = DISPATCH_RETRY;
00416             } break;
00417         }
00418     }
00419   ACE_CATCH (CORBA::TIMEOUT, ex)
00420     {
00421       if (DEBUG_LEVEL  > 0)
00422         ACE_DEBUG ((LM_ERROR,
00423                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00424                               "%u::dispatch_batch() %s\n"),
00425                     this->proxy ()->id (),
00426                     ex._info().c_str ()
00427                     ));
00428       result = DISPATCH_FAIL;
00429     }
00430   ACE_CATCH (CORBA::COMM_FAILURE, ex)
00431     {
00432       if (DEBUG_LEVEL  > 0)
00433         ACE_DEBUG ((LM_ERROR,
00434                     ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00435                               "%u::dispatch_batch() %s\n"),
00436                     this->proxy ()->id (),
00437                     ex._info().c_str ()
00438                     ));
00439       result = DISPATCH_FAIL;
00440     }
00441   ACE_CATCH (CORBA::SystemException, ex)
00442     {
00443       if (DEBUG_LEVEL  > 0)
00444         {
00445           ACE_DEBUG ((LM_ERROR,
00446                       ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00447                                 "%d::dispatch_batch() SystemException %s\n"),
00448                       static_cast<int>  (this->proxy ()->id ()),
00449                       ex._info ().c_str ()
00450                       ));
00451         }
00452       result = DISPATCH_DISCARD;
00453     }
00454   ACE_CATCHANY
00455     {
00456       ACE_ERROR ((LM_ERROR,
00457                   ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00458                             "%d::dispatch_batch() Caught unexpected "
00459                             "exception pushing batch to consumer.\n"),
00460                   static_cast<int> (this->proxy ()->id ())
00461                   ));
00462       result = DISPATCH_DISCARD;
00463     }
00464   ACE_ENDTRY;
00465   return result;
00466 }
00467 
00468 void
00469 TAO_Notify_Consumer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00470 {
00471   if (DEBUG_LEVEL  > 5)
00472     ACE_DEBUG ((LM_DEBUG,
00473                 ACE_TEXT ("Consumer %d dispatching pending events.  Queue size: %d\n"),
00474                 static_cast<int> (this->proxy ()->id ()),
00475                 this->pending_events().size ()
00476                 ));
00477 
00478   // lock ourselves in memory for the duration
00479   TAO_Notify_Consumer::Ptr self_grd (this);
00480 
00481   // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails
00482   ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00483   bool ok = true;
00484   while (ok
00485          && !this->proxy_supplier ()->has_shutdown ()
00486          && !this->pending_events().is_empty ())
00487     {
00488       if (! dispatch_from_queue ( this->pending_events(), ace_mon))
00489         {
00490           this->schedule_timer (true);
00491           ok = false;
00492         }
00493     }
00494 }
00495 
00496 
00497 // virtual: this is the default, overridden for SequencePushConsumer
00498 bool
00499 TAO_Notify_Consumer::dispatch_from_queue (Request_Queue & requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00500 {
00501   bool result = true;
00502   TAO_Notify_Method_Request_Event_Queueable * request;
00503   if (requests.dequeue_head (request) == 0)
00504     {
00505       ace_mon.release ();
00506       DispatchStatus status = this->dispatch_request (request);
00507       switch (status)
00508         {
00509         case DISPATCH_SUCCESS:
00510           {
00511             request->complete ();
00512             request->release ();
00513             result = true;
00514             ace_mon.acquire ();
00515             break;
00516           }
00517         case DISPATCH_RETRY:
00518           {
00519             if (DEBUG_LEVEL  > 0)
00520               ACE_DEBUG ((LM_DEBUG,
00521                           ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00522                           static_cast<int> (this->proxy ()->id ()),
00523                           request->sequence ()
00524                           ));
00525             ace_mon.acquire ();
00526             requests.enqueue_head (request); // put the failed event back where it was
00527             result = false;
00528             break;
00529           }
00530         case DISPATCH_DISCARD:
00531           {
00532             if (DEBUG_LEVEL  > 0)
00533               ACE_DEBUG ((LM_DEBUG,
00534                           ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00535                                     "dispatch. Discarding event:%d.\n"),
00536                           static_cast<int> (this->proxy ()->id ()),
00537                           request->sequence ()
00538                           ));
00539             request->complete ();
00540             ace_mon.acquire ();
00541             result = true;
00542             break;
00543           }
00544         case DISPATCH_FAIL:
00545           {
00546             if (DEBUG_LEVEL  > 0)
00547               ACE_DEBUG ((LM_DEBUG,
00548                           ACE_TEXT ("(%P|%t) Consumer %d: Failed. "
00549                                     "Discarding event %d.\n"),
00550                           static_cast<int> (this->proxy ()->id ()),
00551                           request->sequence ()
00552                           ));
00553             request->complete ();
00554             ace_mon.acquire ();
00555             while (requests.dequeue_head (request) == 0)
00556               {
00557                 ace_mon.release ();
00558                 request->complete ();
00559                 ace_mon.acquire ();
00560               }
00561             ace_mon.release ();
00562             ACE_DECLARE_NEW_ENV;
00563             ACE_TRY
00564               {
00565                 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00566                 ACE_TRY_CHECK;
00567               }
00568             ACE_CATCHANY
00569               {
00570                 // todo is there something reasonable to do here?
00571               }
00572             ACE_ENDTRY;
00573             ace_mon.acquire ();
00574             result = true;
00575             break;
00576           }
00577         default:
00578           {
00579             ace_mon.acquire ();
00580             result = false;
00581             break;
00582           }
00583         }
00584     }
00585   return result;
00586 }
00587 
00588 //@@todo: rather than is_error, use pacing interval so it will be configurable
00589 //@@todo: find some way to use batch buffering stratgy for sequence consumers.
00590 void
00591 TAO_Notify_Consumer::schedule_timer (bool is_error)
00592 {
00593   if (this->timer_id_ != -1)
00594     {
00595       return; // We only want a single timeout scheduled.
00596     }
00597   // don't schedule timer if there's nothing that can be done
00598   if (this->is_suspended ())
00599     {
00600       return;
00601     }
00602 
00603   ACE_ASSERT (this->timer_.get() != 0);
00604 
00605   // If we're scheduling the timer due to an error then we want to
00606   // use the retry timeout, otherwise we'll assume that the pacing
00607   // interval is sufficient for now.
00608   ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);
00609 
00610   if (! is_error)
00611     {
00612       if (this->pacing_.is_valid ())
00613         {
00614           tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
00615         }
00616     }
00617 
00618   if (DEBUG_LEVEL  > 5)
00619     {
00620       ACE_DEBUG ((LM_DEBUG,
00621                   ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
00622                   static_cast<int> (this->proxy ()->id ()), tv.msec ()));
00623     }
00624 
00625   this->timer_id_ =
00626     this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero);
00627   if (this->timer_id_ == -1)
00628     {
00629       ACE_ERROR ((LM_ERROR,
00630                   ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () "
00631                             "Error scheduling timer.\n"),
00632                   static_cast<int> (this->proxy ()->id ())
00633                   ));
00634     }
00635 }
00636 
00637 void
00638 TAO_Notify_Consumer::cancel_timer (void)
00639 {
00640   if (this->timer_.isSet() && this->timer_id_ != -1)
00641     {
00642       if (DEBUG_LEVEL  > 5)
00643         ACE_DEBUG ((LM_DEBUG,
00644                     ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
00645                     static_cast<int> (this->proxy ()->id ())
00646                     ));
00647 
00648       this->timer_->cancel_timer (timer_id_);
00649     }
00650   this->timer_id_ = -1;
00651 }
00652 
00653 int
00654 TAO_Notify_Consumer::handle_timeout (const ACE_Time_Value&, const void*)
00655 {
00656   TAO_Notify_Consumer::Ptr grd (this);
00657   this->timer_id_ = -1;  // This must come first, because dispatch_pending may try to resched
00658   ACE_DECLARE_NEW_ENV;
00659   ACE_TRY
00660     {
00661       this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00662       ACE_TRY_CHECK;
00663     }
00664   ACE_CATCHALL
00665     {
00666     }
00667   ACE_ENDTRY;
00668 
00669   return 0;
00670 }
00671 
00672 void
00673 TAO_Notify_Consumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00674 {
00675   if (this->timer_.isSet ())
00676     {
00677       this->cancel_timer ();
00678       this->timer_.reset ();
00679     }
00680 }
00681 
00682 void
00683 TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed
00684                                          ACE_ENV_ARG_DECL)
00685 {
00686   if (!CORBA::is_nil (this->publish_.in ()))
00687     this->publish_->offer_change (added, removed ACE_ENV_ARG_PARAMETER);
00688 }
00689 
00690 TAO_SYNCH_MUTEX*
00691 TAO_Notify_Consumer::proxy_lock (void)
00692 {
00693   return &this->proxy_->lock_;
00694 }
00695 
00696 TAO_Notify_ProxySupplier*
00697 TAO_Notify_Consumer::proxy_supplier (void)
00698 {
00699   return this->proxy_;
00700 }
00701 
00702 TAO_END_VERSIONED_NAMESPACE_DECL

Generated on Thu Nov 9 13:24:08 2006 for TAO_CosNotification by doxygen 1.3.6