00001
00002
00003 #include "orbsvcs/Notify/Consumer.h"
00004
00005 #if ! defined (__ACE_INLINE__)
00006 #include "orbsvcs/Notify/Consumer.inl"
00007 #endif
00008
00009 ACE_RCSID (RT_Notify, TAO_Notify_Consumer, "Consumer.cpp,v 1.19 2006/03/14 06:14:34 jtc Exp")
00010
00011 #include "orbsvcs/Notify/Timer.h"
00012 #include "orbsvcs/Notify/ProxySupplier.h"
00013 #include "orbsvcs/Notify/Method_Request_Event.h"
00014
00015 #include "orbsvcs/Time_Utilities.h"
00016
00017 #include "tao/debug.h"
00018 #include "tao/corba.h"
00019
00020 #include "ace/Bound_Ptr.h"
00021 #include "ace/Unbounded_Queue.h"
00022
00023 #ifndef DEBUG_LEVEL
00024 # define DEBUG_LEVEL TAO_debug_level
00025 #endif //DEBUG_LEVEL
00026
00027 static const int DEFAULT_RETRY_TIMEOUT = 10;
00028
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030
00031 TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy)
00032 : proxy_ (proxy)
00033 , is_suspended_ (0)
00034 , pacing_ (proxy->qos_properties_.pacing_interval ())
00035 , max_batch_size_ (CosNotification::MaximumBatchSize, 0)
00036 , timer_id_ (-1)
00037 , timer_ (0)
00038 {
00039 Request_Queue* pending_events = 0;
00040 ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ());
00041 this->pending_events_.reset( pending_events );
00042
00043 this->timer_.reset( this->proxy ()->timer () );
00044 }
00045
00046 TAO_Notify_Consumer::~TAO_Notify_Consumer ()
00047 {
00048 if (this->timer_.isSet())
00049 {
00050 this->cancel_timer ();
00051 this->timer_.reset ();
00052 }
00053 }
00054
00055 TAO_Notify_Proxy*
00056 TAO_Notify_Consumer::proxy (void)
00057 {
00058 return this->proxy_supplier ();
00059 }
00060
00061 void
00062 TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
00063 {
00064 this->max_batch_size_ = qos_properties.maximum_batch_size ();
00065 }
00066
00067 void
00068 TAO_Notify_Consumer::resume (ACE_ENV_SINGLE_ARG_DECL)
00069 {
00070 this->is_suspended_ = 0;
00071
00072 this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00073 }
00074
00075 void
00076 TAO_Notify_Consumer::enqueue_request (
00077 TAO_Notify_Method_Request_Event * request
00078 ACE_ENV_ARG_DECL)
00079 {
00080 TAO_Notify_Event::Ptr event (
00081 request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00082 ACE_CHECK;
00083
00084 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00085 ACE_NEW_THROW_EX (queue_entry,
00086 TAO_Notify_Method_Request_Event_Queueable (*request, event),
00087 CORBA::NO_MEMORY ());
00088 ACE_CHECK;
00089
00090 if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG,
00091 ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
00092 static_cast<int> (this->proxy ()->id ()),
00093 request->sequence (),
00094 request
00095 ));
00096 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00097 this->pending_events().enqueue_tail (queue_entry);
00098 }
00099
00100 bool
00101 TAO_Notify_Consumer::enqueue_if_necessary (TAO_Notify_Method_Request_Event * request ACE_ENV_ARG_DECL)
00102 {
00103 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
00104 if (! this->pending_events().is_empty ())
00105 {
00106 if (DEBUG_LEVEL > 3)
00107 ACE_DEBUG ((LM_DEBUG,
00108 ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
00109 static_cast<int> (this->proxy ()->id ()),
00110 request->sequence ()
00111 ));
00112 TAO_Notify_Event::Ptr event (
00113 request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00114 ACE_CHECK_RETURN (false);
00115 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00116 ACE_NEW_THROW_EX (queue_entry,
00117 TAO_Notify_Method_Request_Event_Queueable (*request,
00118 event),
00119 CORBA::NO_MEMORY ());
00120 ACE_CHECK_RETURN (false);
00121 this->pending_events().enqueue_tail (queue_entry);
00122 this->schedule_timer (false);
00123 return true;
00124 }
00125 if (this->is_suspended_ == 1)
00126 {
00127 if (DEBUG_LEVEL > 3)
00128 ACE_DEBUG ((LM_DEBUG,
00129 ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
00130 static_cast<int> (this->proxy ()->id ()),
00131 request->sequence ()
00132 ));
00133 TAO_Notify_Event::Ptr event (
00134 request->event ()->queueable_copy (ACE_ENV_SINGLE_ARG_PARAMETER));
00135 ACE_CHECK_RETURN (false);
00136 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00137 ACE_NEW_THROW_EX (queue_entry,
00138 TAO_Notify_Method_Request_Event_Queueable (*request,
00139 event),
00140 CORBA::NO_MEMORY ());
00141 ACE_CHECK_RETURN (false);
00142 this->pending_events().enqueue_tail (queue_entry);
00143 this->schedule_timer (false);
00144 return true;
00145 }
00146 return false;
00147 }
00148
00149 void
00150 TAO_Notify_Consumer::deliver (TAO_Notify_Method_Request_Event * request
00151 ACE_ENV_ARG_DECL)
00152 {
00153
00154
00155 TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ());
00156 bool queued = enqueue_if_necessary (request ACE_ENV_ARG_PARAMETER);
00157 ACE_CHECK;
00158 if (!queued)
00159 {
00160 DispatchStatus status = this->dispatch_request (request);
00161 switch (status)
00162 {
00163 case DISPATCH_SUCCESS:
00164 {
00165 request->complete ();
00166 break;
00167 }
00168 case DISPATCH_RETRY:
00169 {
00170 if (DEBUG_LEVEL > 1)
00171 ACE_DEBUG ((LM_DEBUG,
00172 ACE_TEXT ("Consumer %d enqueing event %d due ")
00173 ACE_TEXT ("to failed dispatch.\n"),
00174 static_cast<int> (this->proxy ()->id ()),
00175 request->sequence ()));
00176 this->enqueue_request (request ACE_ENV_ARG_PARAMETER);
00177 ACE_CHECK;
00178 this->schedule_timer (true);
00179 break;
00180 }
00181 case DISPATCH_DISCARD:
00182 {
00183 if (DEBUG_LEVEL > 0)
00184 ACE_DEBUG ((LM_DEBUG,
00185 ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00186 "direct dispatch. Discarding event:%d.\n"),
00187 static_cast<int> (this->proxy ()->id ()),
00188 request->sequence ()
00189 ));
00190 request->complete ();
00191 break;
00192 }
00193 case DISPATCH_FAIL:
00194 {
00195 if (DEBUG_LEVEL > 0)
00196 ACE_DEBUG ((LM_DEBUG,
00197 ACE_TEXT ("(%P|%t) Consumer %d: Failed during "
00198 "direct dispatch :%d. Discarding event.\n"),
00199 static_cast<int> (this->proxy ()->id ()),
00200 request->sequence ()
00201 ));
00202 request->complete ();
00203 ACE_DECLARE_NEW_ENV;
00204 ACE_TRY
00205 {
00206 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00207 ACE_TRY_CHECK;
00208 }
00209 ACE_CATCHANY
00210 {
00211
00212 ;
00213 }
00214 ACE_ENDTRY;
00215 break;
00216 }
00217 }
00218 }
00219 }
00220
00221 TAO_Notify_Consumer::DispatchStatus
00222 TAO_Notify_Consumer::dispatch_request (TAO_Notify_Method_Request_Event * request)
00223 {
00224 DispatchStatus result = DISPATCH_SUCCESS;
00225 ACE_DECLARE_NEW_ENV;
00226 ACE_TRY
00227 {
00228 request->event ()->push (this ACE_ENV_ARG_PARAMETER);
00229 ACE_TRY_CHECK;
00230 if (DEBUG_LEVEL > 8)
00231 ACE_DEBUG ((LM_DEBUG,
00232 ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
00233 static_cast<int> (this->proxy ()->id ()),
00234 request->sequence ()
00235 ));
00236 }
00237 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00238 {
00239 if (DEBUG_LEVEL > 0)
00240 {
00241 ACE_DEBUG ((LM_ERROR,
00242 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00243 "(request) %s\n"),
00244 static_cast<int> (this->proxy ()->id ()),
00245 ex._info ().c_str ()
00246 ));
00247 }
00248 result = DISPATCH_FAIL;
00249 }
00250 ACE_CATCH (CORBA::TRANSIENT, ex)
00251 {
00252 if (DEBUG_LEVEL > 0)
00253 ACE_DEBUG ((LM_ERROR,
00254 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00255 "(request) Transient (minor=%d) %s\n"),
00256 static_cast<int> (this->proxy ()->id ()),
00257 ex.minor (),
00258 ex._info ().c_str ()
00259 ));
00260 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00261 switch (ex.minor () & 0xfffff000u)
00262 {
00263 case CORBA::OMGVMCID:
00264 switch (ex.minor () & 0x00000fffu)
00265 {
00266 case 2:
00267 case 3:
00268 case 4:
00269 result = DISPATCH_FAIL;
00270 break;
00271 default:
00272 result = DISPATCH_DISCARD;
00273 }
00274 break;
00275
00276 case TAO::VMCID:
00277 default:
00278 switch (ex.minor () & BITS_5_THRU_12_MASK)
00279 {
00280 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00281 result = DISPATCH_FAIL;
00282 break;
00283 case TAO_POA_DISCARDING:
00284 case TAO_POA_HOLDING:
00285 default:
00286 result = DISPATCH_RETRY;
00287 } break;
00288 }
00289 }
00290 ACE_CATCH (CORBA::TIMEOUT, ex)
00291 {
00292 if (DEBUG_LEVEL > 0)
00293 ACE_DEBUG ((LM_ERROR,
00294 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00295 "(request) %s\n"),
00296 this->proxy ()->id (),
00297 ex._info().c_str ()
00298 ));
00299 result = DISPATCH_FAIL;
00300 }
00301 ACE_CATCH (CORBA::COMM_FAILURE, ex)
00302 {
00303 if (DEBUG_LEVEL > 0)
00304 ACE_DEBUG ((LM_ERROR,
00305 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00306 "(request) %s\n"),
00307 this->proxy ()->id (),
00308 ex._info().c_str ()
00309 ));
00310 result = DISPATCH_FAIL;
00311 }
00312 ACE_CATCH (CORBA::SystemException, ex)
00313 {
00314 if (DEBUG_LEVEL > 0)
00315 {
00316 ACE_DEBUG ((LM_ERROR,
00317 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00318 "(request) SystemException %s\n"),
00319 static_cast<int> (this->proxy ()->id ()),
00320 ex._info ().c_str ()
00321 ));
00322 }
00323 result = DISPATCH_DISCARD;
00324 }
00325 ACE_CATCHANY
00326 {
00327 ACE_ERROR ( (LM_ERROR,
00328 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00329 "(request) Caught unexpected exception "
00330 "pushing event to consumer.\n"),
00331 static_cast<int> (this->proxy ()->id ())
00332 ));
00333 result = DISPATCH_DISCARD;
00334 }
00335 ACE_ENDTRY;
00336
00337
00338
00339
00340 if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD)
00341 {
00342 if (request->should_retry ())
00343 {
00344 result = DISPATCH_RETRY;
00345 }
00346 }
00347 else if (result == DISPATCH_RETRY)
00348 {
00349 if (! request->should_retry ())
00350 {
00351 result = DISPATCH_DISCARD;
00352 }
00353 }
00354
00355 return result;
00356 }
00357
00358 TAO_Notify_Consumer::DispatchStatus
00359 TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch)
00360 {
00361 DispatchStatus result = DISPATCH_SUCCESS;
00362 ACE_DECLARE_NEW_ENV;
00363 ACE_TRY
00364 {
00365 this->push (batch ACE_ENV_ARG_PARAMETER);
00366 ACE_TRY_CHECK;
00367 }
00368 ACE_CATCH (CORBA::OBJECT_NOT_EXIST, ex)
00369 {
00370 if (DEBUG_LEVEL > 0)
00371 ACE_DEBUG ((LM_ERROR,
00372 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00373 "%d::dispatch_batch() %s\n"),
00374 static_cast<int> (this->proxy ()->id ()),
00375 ex._info ().c_str ()
00376 ));
00377 result = DISPATCH_FAIL;
00378 }
00379 ACE_CATCH (CORBA::TRANSIENT, ex)
00380 {
00381 if (DEBUG_LEVEL > 0)
00382 ACE_DEBUG ((LM_ERROR,
00383 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00384 "%d::dispatch_batch() Transient (minor=%d) %s\n"),
00385 static_cast<int> (this->proxy ()->id ()),
00386 ex.minor (),
00387 ex._info ().c_str ()
00388 ));
00389 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00390 switch (ex.minor () & 0xfffff000u)
00391 {
00392 case CORBA::OMGVMCID:
00393 switch (ex.minor () & 0x00000fffu)
00394 {
00395 case 2:
00396 case 3:
00397 case 4:
00398 result = DISPATCH_FAIL;
00399 break;
00400 default:
00401 result = DISPATCH_DISCARD;
00402 }
00403 break;
00404
00405 case TAO::VMCID:
00406 default:
00407 switch (ex.minor () & BITS_5_THRU_12_MASK)
00408 {
00409 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00410 result = DISPATCH_FAIL;
00411 break;
00412 case TAO_POA_DISCARDING:
00413 case TAO_POA_HOLDING:
00414 default:
00415 result = DISPATCH_RETRY;
00416 } break;
00417 }
00418 }
00419 ACE_CATCH (CORBA::TIMEOUT, ex)
00420 {
00421 if (DEBUG_LEVEL > 0)
00422 ACE_DEBUG ((LM_ERROR,
00423 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00424 "%u::dispatch_batch() %s\n"),
00425 this->proxy ()->id (),
00426 ex._info().c_str ()
00427 ));
00428 result = DISPATCH_FAIL;
00429 }
00430 ACE_CATCH (CORBA::COMM_FAILURE, ex)
00431 {
00432 if (DEBUG_LEVEL > 0)
00433 ACE_DEBUG ((LM_ERROR,
00434 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00435 "%u::dispatch_batch() %s\n"),
00436 this->proxy ()->id (),
00437 ex._info().c_str ()
00438 ));
00439 result = DISPATCH_FAIL;
00440 }
00441 ACE_CATCH (CORBA::SystemException, ex)
00442 {
00443 if (DEBUG_LEVEL > 0)
00444 {
00445 ACE_DEBUG ((LM_ERROR,
00446 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00447 "%d::dispatch_batch() SystemException %s\n"),
00448 static_cast<int> (this->proxy ()->id ()),
00449 ex._info ().c_str ()
00450 ));
00451 }
00452 result = DISPATCH_DISCARD;
00453 }
00454 ACE_CATCHANY
00455 {
00456 ACE_ERROR ((LM_ERROR,
00457 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00458 "%d::dispatch_batch() Caught unexpected "
00459 "exception pushing batch to consumer.\n"),
00460 static_cast<int> (this->proxy ()->id ())
00461 ));
00462 result = DISPATCH_DISCARD;
00463 }
00464 ACE_ENDTRY;
00465 return result;
00466 }
00467
00468 void
00469 TAO_Notify_Consumer::dispatch_pending (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00470 {
00471 if (DEBUG_LEVEL > 5)
00472 ACE_DEBUG ((LM_DEBUG,
00473 ACE_TEXT ("Consumer %d dispatching pending events. Queue size: %d\n"),
00474 static_cast<int> (this->proxy ()->id ()),
00475 this->pending_events().size ()
00476 ));
00477
00478
00479 TAO_Notify_Consumer::Ptr self_grd (this);
00480
00481
00482 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00483 bool ok = true;
00484 while (ok
00485 && !this->proxy_supplier ()->has_shutdown ()
00486 && !this->pending_events().is_empty ())
00487 {
00488 if (! dispatch_from_queue ( this->pending_events(), ace_mon))
00489 {
00490 this->schedule_timer (true);
00491 ok = false;
00492 }
00493 }
00494 }
00495
00496
00497
00498 bool
00499 TAO_Notify_Consumer::dispatch_from_queue (Request_Queue & requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00500 {
00501 bool result = true;
00502 TAO_Notify_Method_Request_Event_Queueable * request;
00503 if (requests.dequeue_head (request) == 0)
00504 {
00505 ace_mon.release ();
00506 DispatchStatus status = this->dispatch_request (request);
00507 switch (status)
00508 {
00509 case DISPATCH_SUCCESS:
00510 {
00511 request->complete ();
00512 request->release ();
00513 result = true;
00514 ace_mon.acquire ();
00515 break;
00516 }
00517 case DISPATCH_RETRY:
00518 {
00519 if (DEBUG_LEVEL > 0)
00520 ACE_DEBUG ((LM_DEBUG,
00521 ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00522 static_cast<int> (this->proxy ()->id ()),
00523 request->sequence ()
00524 ));
00525 ace_mon.acquire ();
00526 requests.enqueue_head (request);
00527 result = false;
00528 break;
00529 }
00530 case DISPATCH_DISCARD:
00531 {
00532 if (DEBUG_LEVEL > 0)
00533 ACE_DEBUG ((LM_DEBUG,
00534 ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00535 "dispatch. Discarding event:%d.\n"),
00536 static_cast<int> (this->proxy ()->id ()),
00537 request->sequence ()
00538 ));
00539 request->complete ();
00540 ace_mon.acquire ();
00541 result = true;
00542 break;
00543 }
00544 case DISPATCH_FAIL:
00545 {
00546 if (DEBUG_LEVEL > 0)
00547 ACE_DEBUG ((LM_DEBUG,
00548 ACE_TEXT ("(%P|%t) Consumer %d: Failed. "
00549 "Discarding event %d.\n"),
00550 static_cast<int> (this->proxy ()->id ()),
00551 request->sequence ()
00552 ));
00553 request->complete ();
00554 ace_mon.acquire ();
00555 while (requests.dequeue_head (request) == 0)
00556 {
00557 ace_mon.release ();
00558 request->complete ();
00559 ace_mon.acquire ();
00560 }
00561 ace_mon.release ();
00562 ACE_DECLARE_NEW_ENV;
00563 ACE_TRY
00564 {
00565 this->proxy_supplier ()->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
00566 ACE_TRY_CHECK;
00567 }
00568 ACE_CATCHANY
00569 {
00570
00571 }
00572 ACE_ENDTRY;
00573 ace_mon.acquire ();
00574 result = true;
00575 break;
00576 }
00577 default:
00578 {
00579 ace_mon.acquire ();
00580 result = false;
00581 break;
00582 }
00583 }
00584 }
00585 return result;
00586 }
00587
00588
00589
00590 void
00591 TAO_Notify_Consumer::schedule_timer (bool is_error)
00592 {
00593 if (this->timer_id_ != -1)
00594 {
00595 return;
00596 }
00597
00598 if (this->is_suspended ())
00599 {
00600 return;
00601 }
00602
00603 ACE_ASSERT (this->timer_.get() != 0);
00604
00605
00606
00607
00608 ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);
00609
00610 if (! is_error)
00611 {
00612 if (this->pacing_.is_valid ())
00613 {
00614 tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
00615 }
00616 }
00617
00618 if (DEBUG_LEVEL > 5)
00619 {
00620 ACE_DEBUG ((LM_DEBUG,
00621 ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
00622 static_cast<int> (this->proxy ()->id ()), tv.msec ()));
00623 }
00624
00625 this->timer_id_ =
00626 this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero);
00627 if (this->timer_id_ == -1)
00628 {
00629 ACE_ERROR ((LM_ERROR,
00630 ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () "
00631 "Error scheduling timer.\n"),
00632 static_cast<int> (this->proxy ()->id ())
00633 ));
00634 }
00635 }
00636
00637 void
00638 TAO_Notify_Consumer::cancel_timer (void)
00639 {
00640 if (this->timer_.isSet() && this->timer_id_ != -1)
00641 {
00642 if (DEBUG_LEVEL > 5)
00643 ACE_DEBUG ((LM_DEBUG,
00644 ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
00645 static_cast<int> (this->proxy ()->id ())
00646 ));
00647
00648 this->timer_->cancel_timer (timer_id_);
00649 }
00650 this->timer_id_ = -1;
00651 }
00652
00653 int
00654 TAO_Notify_Consumer::handle_timeout (const ACE_Time_Value&, const void*)
00655 {
00656 TAO_Notify_Consumer::Ptr grd (this);
00657 this->timer_id_ = -1;
00658 ACE_DECLARE_NEW_ENV;
00659 ACE_TRY
00660 {
00661 this->dispatch_pending (ACE_ENV_SINGLE_ARG_PARAMETER);
00662 ACE_TRY_CHECK;
00663 }
00664 ACE_CATCHALL
00665 {
00666 }
00667 ACE_ENDTRY;
00668
00669 return 0;
00670 }
00671
00672 void
00673 TAO_Notify_Consumer::shutdown (ACE_ENV_SINGLE_ARG_DECL_NOT_USED)
00674 {
00675 if (this->timer_.isSet ())
00676 {
00677 this->cancel_timer ();
00678 this->timer_.reset ();
00679 }
00680 }
00681
00682 void
00683 TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed
00684 ACE_ENV_ARG_DECL)
00685 {
00686 if (!CORBA::is_nil (this->publish_.in ()))
00687 this->publish_->offer_change (added, removed ACE_ENV_ARG_PARAMETER);
00688 }
00689
00690 TAO_SYNCH_MUTEX*
00691 TAO_Notify_Consumer::proxy_lock (void)
00692 {
00693 return &this->proxy_->lock_;
00694 }
00695
00696 TAO_Notify_ProxySupplier*
00697 TAO_Notify_Consumer::proxy_supplier (void)
00698 {
00699 return this->proxy_;
00700 }
00701
00702 TAO_END_VERSIONED_NAMESPACE_DECL