00001
00002
00003 #include "orbsvcs/Notify/Consumer.h"
00004
00005 #if ! defined (__ACE_INLINE__)
00006 #include "orbsvcs/Notify/Consumer.inl"
00007 #endif
00008
00009 ACE_RCSID (RT_Notify, TAO_Notify_Consumer, "$Id: Consumer.cpp 84685 2009-03-02 22:49:17Z mesnier_p $")
00010
00011 #include "orbsvcs/Notify/Timer.h"
00012 #include "orbsvcs/Notify/ProxySupplier.h"
00013 #include "orbsvcs/Notify/Method_Request_Event.h"
00014 #include "orbsvcs/Notify/QoSProperties.h"
00015 #include "orbsvcs/Notify/Properties.h"
00016
00017 #include "orbsvcs/Time_Utilities.h"
00018
00019 #include "tao/debug.h"
00020 #include "tao/corba.h"
00021 #include "tao/Messaging/Messaging_TypesC.h"
00022
00023 #include "ace/Bound_Ptr.h"
00024 #include "ace/Unbounded_Queue.h"
00025
00026 #ifndef DEBUG_LEVEL
00027 # define DEBUG_LEVEL TAO_debug_level
00028 #endif //DEBUG_LEVEL
00029
00030 static const int DEFAULT_RETRY_TIMEOUT = 10;
00031
00032 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00033
00034 TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy)
00035 : proxy_ (proxy)
00036 , is_suspended_ (0)
00037 , have_not_yet_verified_publish_ (true)
00038 , pacing_ (proxy->qos_properties_.pacing_interval ())
00039 , max_batch_size_ (CosNotification::MaximumBatchSize, 0)
00040 , timer_id_ (-1)
00041 , timer_ (0)
00042 {
00043 Request_Queue* pending_events = 0;
00044 ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ());
00045 this->pending_events_.reset( pending_events );
00046
00047 this->timer_.reset( this->proxy ()->timer () );
00048
00049
00050 this->reference_counting_policy ().value (
00051 ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00052 }
00053
00054 TAO_Notify_Consumer::~TAO_Notify_Consumer ()
00055 {
00056 if (this->timer_.isSet())
00057 {
00058 this->cancel_timer ();
00059 this->timer_.reset ();
00060 }
00061 }
00062
00063 CORBA::ULong
00064 TAO_Notify_Consumer::_incr_refcnt (void)
00065 {
00066 return this->add_reference();
00067 }
00068
00069 CORBA::ULong
00070 TAO_Notify_Consumer::_decr_refcnt (void)
00071 {
00072 return this->remove_reference();
00073 }
00074
00075 TAO_Notify_Proxy*
00076 TAO_Notify_Consumer::proxy (void)
00077 {
00078 return this->proxy_supplier ();
00079 }
00080
00081 void
00082 TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
00083 {
00084 this->max_batch_size_ = qos_properties.maximum_batch_size ();
00085 }
00086
00087 void
00088 TAO_Notify_Consumer::resume (void)
00089 {
00090 this->is_suspended_ = 0;
00091
00092 this->dispatch_pending ();
00093 }
00094
00095 void
00096 TAO_Notify_Consumer::enqueue_request (
00097 TAO_Notify_Method_Request_Event * request)
00098 {
00099 TAO_Notify_Event::Ptr event (
00100 request->event ()->queueable_copy ());
00101
00102 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00103 ACE_NEW_THROW_EX (queue_entry,
00104 TAO_Notify_Method_Request_Event_Queueable (*request, event),
00105 CORBA::NO_MEMORY ());
00106
00107 if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG,
00108 ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
00109 static_cast<int> (this->proxy ()->id ()),
00110 request->sequence (),
00111 request
00112 ));
00113 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00114 this->pending_events().enqueue_tail (queue_entry);
00115 }
00116
00117 bool
00118 TAO_Notify_Consumer::enqueue_if_necessary (TAO_Notify_Method_Request_Event * request)
00119 {
00120 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
00121 if (! this->pending_events().is_empty ())
00122 {
00123 if (DEBUG_LEVEL > 3)
00124 ACE_DEBUG ((LM_DEBUG,
00125 ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
00126 static_cast<int> (this->proxy ()->id ()),
00127 request->sequence ()
00128 ));
00129 TAO_Notify_Event::Ptr event (
00130 request->event ()->queueable_copy ());
00131 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00132 ACE_NEW_THROW_EX (queue_entry,
00133 TAO_Notify_Method_Request_Event_Queueable (*request,
00134 event),
00135 CORBA::NO_MEMORY ());
00136 this->pending_events().enqueue_tail (queue_entry);
00137 this->schedule_timer (false);
00138 return true;
00139 }
00140 if (this->is_suspended_ == 1)
00141 {
00142 if (DEBUG_LEVEL > 3)
00143 ACE_DEBUG ((LM_DEBUG,
00144 ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
00145 static_cast<int> (this->proxy ()->id ()),
00146 request->sequence ()
00147 ));
00148 TAO_Notify_Event::Ptr event (
00149 request->event ()->queueable_copy ());
00150 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00151 ACE_NEW_THROW_EX (queue_entry,
00152 TAO_Notify_Method_Request_Event_Queueable (*request,
00153 event),
00154 CORBA::NO_MEMORY ());
00155 this->pending_events().enqueue_tail (queue_entry);
00156 this->schedule_timer (false);
00157 return true;
00158 }
00159 return false;
00160 }
00161
00162 void
00163 TAO_Notify_Consumer::deliver (TAO_Notify_Method_Request_Event * request)
00164 {
00165
00166
00167 TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ());
00168 bool queued = enqueue_if_necessary (request);
00169 if (!queued)
00170 {
00171 bool from_timeout = false;
00172 DispatchStatus status = this->dispatch_request (request);
00173 switch (status)
00174 {
00175 case DISPATCH_SUCCESS:
00176 {
00177 request->complete ();
00178 break;
00179 }
00180 case DISPATCH_RETRY:
00181 {
00182 if (DEBUG_LEVEL > 1)
00183 ACE_DEBUG ((LM_DEBUG,
00184 ACE_TEXT ("Consumer %d enqueing event %d due ")
00185 ACE_TEXT ("to failed dispatch.\n"),
00186 static_cast<int> (this->proxy ()->id ()),
00187 request->sequence ()));
00188 this->enqueue_request (request);
00189 this->schedule_timer (true);
00190 break;
00191 }
00192 case DISPATCH_DISCARD:
00193 {
00194 if (DEBUG_LEVEL > 0)
00195 ACE_DEBUG ((LM_DEBUG,
00196 ACE_TEXT ("(%P|%t) Consumer %d: Error during ")
00197 ACE_TEXT ("direct dispatch. Discarding event:%d.\n"),
00198 static_cast<int> (this->proxy ()->id ()),
00199 request->sequence ()
00200 ));
00201 request->complete ();
00202 break;
00203 }
00204 case DISPATCH_FAIL_TIMEOUT:
00205 from_timeout = true;
00206
00207 case DISPATCH_FAIL:
00208 {
00209 if (DEBUG_LEVEL > 0)
00210 ACE_DEBUG ((LM_DEBUG,
00211 ACE_TEXT ("(%P|%t) Consumer %d: Failed during ")
00212 ACE_TEXT ("direct dispatch :%d. Discarding event.\n"),
00213 static_cast<int> (this->proxy ()->id ()),
00214 request->sequence ()
00215 ));
00216 request->complete ();
00217 try
00218 {
00219 this->proxy_supplier ()->destroy (from_timeout);
00220 }
00221 catch (const CORBA::Exception&)
00222 {
00223
00224 ;
00225 }
00226 break;
00227 }
00228 }
00229 }
00230 }
00231
00232 TAO_Notify_Consumer::DispatchStatus
00233 TAO_Notify_Consumer::dispatch_request (TAO_Notify_Method_Request_Event * request)
00234 {
00235 DispatchStatus result = DISPATCH_SUCCESS;
00236 try
00237 {
00238 request->event ()->push (this);
00239 if (DEBUG_LEVEL > 8)
00240 ACE_DEBUG ((LM_DEBUG,
00241 ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
00242 static_cast<int> (this->proxy ()->id ()),
00243 request->sequence ()
00244 ));
00245 }
00246 catch (const CORBA::OBJECT_NOT_EXIST& ex)
00247 {
00248 if (DEBUG_LEVEL > 0)
00249 {
00250 ACE_DEBUG ((LM_ERROR,
00251 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push ")
00252 ACE_TEXT ("(request) %s\n"),
00253 static_cast<int> (this->proxy ()->id ()),
00254 ex._info ().c_str ()
00255 ));
00256 }
00257 result = DISPATCH_FAIL;
00258 }
00259 catch (const CORBA::TRANSIENT& ex)
00260 {
00261 if (DEBUG_LEVEL > 0)
00262 ACE_DEBUG ((LM_ERROR,
00263 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push ")
00264 ACE_TEXT ("(request) Transient (minor=%d) %s\n"),
00265 static_cast<int> (this->proxy ()->id ()),
00266 ex.minor (),
00267 ex._info ().c_str ()
00268 ));
00269 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00270 switch (ex.minor () & 0xfffff000u)
00271 {
00272 case CORBA::OMGVMCID:
00273 switch (ex.minor () & 0x00000fffu)
00274 {
00275 case 2:
00276 case 3:
00277 case 4:
00278 result = DISPATCH_FAIL;
00279 break;
00280 default:
00281 result = DISPATCH_DISCARD;
00282 }
00283 break;
00284
00285 case TAO::VMCID:
00286 default:
00287 switch (ex.minor () & BITS_5_THRU_12_MASK)
00288 {
00289 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00290 result = DISPATCH_FAIL;
00291 break;
00292 case TAO_POA_DISCARDING:
00293 case TAO_POA_HOLDING:
00294 default:
00295 result = DISPATCH_RETRY;
00296 } break;
00297 }
00298 }
00299 catch (const CORBA::TIMEOUT& ex)
00300 {
00301 if (DEBUG_LEVEL > 0)
00302 ACE_DEBUG ((LM_ERROR,
00303 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push ")
00304 ACE_TEXT ("(request) %s\n"),
00305 this->proxy ()->id (),
00306 ex._info().c_str ()
00307 ));
00308 result = DISPATCH_FAIL_TIMEOUT;
00309 }
00310 catch (const CORBA::COMM_FAILURE& ex)
00311 {
00312 if (DEBUG_LEVEL > 0)
00313 ACE_DEBUG ((LM_ERROR,
00314 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push ")
00315 ACE_TEXT ("(request) %s\n"),
00316 this->proxy ()->id (),
00317 ex._info().c_str ()
00318 ));
00319 result = DISPATCH_FAIL;
00320 }
00321 catch (const CORBA::SystemException& ex)
00322 {
00323 if (DEBUG_LEVEL > 0)
00324 {
00325 ACE_DEBUG ((LM_ERROR,
00326 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push ")
00327 ACE_TEXT ("(request) SystemException %s\n"),
00328 static_cast<int> (this->proxy ()->id ()),
00329 ex._info ().c_str ()
00330 ));
00331 }
00332 result = DISPATCH_DISCARD;
00333 }
00334 catch (const CORBA::Exception&)
00335 {
00336 ACE_ERROR ( (LM_ERROR,
00337 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push ")
00338 ACE_TEXT ("(request) Caught unexpected exception ")
00339 ACE_TEXT ("pushing event to consumer.\n"),
00340 static_cast<int> (this->proxy ()->id ())
00341 ));
00342 result = DISPATCH_DISCARD;
00343 }
00344
00345
00346
00347
00348 if (result == DISPATCH_FAIL ||
00349 result == DISPATCH_FAIL_TIMEOUT || result == DISPATCH_DISCARD)
00350 {
00351 if (request->should_retry ())
00352 {
00353 result = DISPATCH_RETRY;
00354 }
00355 }
00356 else if (result == DISPATCH_RETRY)
00357 {
00358 if (! request->should_retry ())
00359 {
00360 result = DISPATCH_DISCARD;
00361 }
00362 }
00363
00364 return result;
00365 }
00366
00367 TAO_Notify_Consumer::DispatchStatus
00368 TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch)
00369 {
00370 DispatchStatus result = DISPATCH_SUCCESS;
00371 try
00372 {
00373 this->push (batch);
00374 }
00375 catch (const CORBA::OBJECT_NOT_EXIST& ex)
00376 {
00377 if (DEBUG_LEVEL > 0)
00378 ACE_DEBUG ((LM_ERROR,
00379 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
00380 ACE_TEXT ("%d::dispatch_batch() %s\n"),
00381 static_cast<int> (this->proxy ()->id ()),
00382 ex._info ().c_str ()
00383 ));
00384 result = DISPATCH_FAIL;
00385 }
00386 catch (const CORBA::TRANSIENT& ex)
00387 {
00388 if (DEBUG_LEVEL > 0)
00389 ACE_DEBUG ((LM_ERROR,
00390 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
00391 ACE_TEXT ("%d::dispatch_batch() Transient (minor=%d) %s\n"),
00392 static_cast<int> (this->proxy ()->id ()),
00393 ex.minor (),
00394 ex._info ().c_str ()
00395 ));
00396 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00397 switch (ex.minor () & 0xfffff000u)
00398 {
00399 case CORBA::OMGVMCID:
00400 switch (ex.minor () & 0x00000fffu)
00401 {
00402 case 2:
00403 case 3:
00404 case 4:
00405 result = DISPATCH_FAIL;
00406 break;
00407 default:
00408 result = DISPATCH_DISCARD;
00409 }
00410 break;
00411
00412 case TAO::VMCID:
00413 default:
00414 switch (ex.minor () & BITS_5_THRU_12_MASK)
00415 {
00416 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00417 result = DISPATCH_FAIL;
00418 break;
00419 case TAO_POA_DISCARDING:
00420 case TAO_POA_HOLDING:
00421 default:
00422 result = DISPATCH_RETRY;
00423 } break;
00424 }
00425 }
00426 catch (const CORBA::TIMEOUT& ex)
00427 {
00428 if (DEBUG_LEVEL > 0)
00429 ACE_DEBUG ((LM_ERROR,
00430 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
00431 ACE_TEXT ("%u::dispatch_batch() %s\n"),
00432 this->proxy ()->id (),
00433 ex._info().c_str ()
00434 ));
00435 result = DISPATCH_FAIL_TIMEOUT;
00436 }
00437 catch (const CORBA::COMM_FAILURE& ex)
00438 {
00439 if (DEBUG_LEVEL > 0)
00440 ACE_DEBUG ((LM_ERROR,
00441 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
00442 ACE_TEXT ("%u::dispatch_batch() %s\n"),
00443 this->proxy ()->id (),
00444 ex._info().c_str ()
00445 ));
00446 result = DISPATCH_FAIL;
00447 }
00448 catch (const CORBA::SystemException& ex)
00449 {
00450 if (DEBUG_LEVEL > 0)
00451 {
00452 ACE_DEBUG ((LM_ERROR,
00453 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
00454 ACE_TEXT ("%d::dispatch_batch() SystemException %s\n"),
00455 static_cast<int> (this->proxy ()->id ()),
00456 ex._info ().c_str ()
00457 ));
00458 }
00459 result = DISPATCH_DISCARD;
00460 }
00461 catch (const CORBA::Exception&)
00462 {
00463 ACE_ERROR ((LM_ERROR,
00464 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
00465 ACE_TEXT ("%d::dispatch_batch() Caught unexpected ")
00466 ACE_TEXT ("exception pushing batch to consumer.\n"),
00467 static_cast<int> (this->proxy ()->id ())
00468 ));
00469 result = DISPATCH_DISCARD;
00470 }
00471 return result;
00472 }
00473
00474 void
00475 TAO_Notify_Consumer::dispatch_pending (void)
00476 {
00477 if (DEBUG_LEVEL > 5)
00478 ACE_DEBUG ((LM_DEBUG,
00479 ACE_TEXT ("Consumer %d dispatching pending events. Queue size: %d\n"),
00480 static_cast<int> (this->proxy ()->id ()),
00481 this->pending_events().size ()
00482 ));
00483
00484
00485 TAO_Notify_Consumer::Ptr self_grd (this);
00486
00487
00488 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00489 bool ok = true;
00490 while (ok
00491 && !this->proxy_supplier ()->has_shutdown ()
00492 && !this->pending_events().is_empty ())
00493 {
00494 if (! dispatch_from_queue ( this->pending_events(), ace_mon))
00495 {
00496 this->schedule_timer (true);
00497 ok = false;
00498 }
00499 }
00500 }
00501
00502
00503
00504 bool
00505 TAO_Notify_Consumer::dispatch_from_queue (Request_Queue & requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00506 {
00507 bool result = true;
00508 TAO_Notify_Method_Request_Event_Queueable * request;
00509 if (requests.dequeue_head (request) == 0)
00510 {
00511 ace_mon.release ();
00512 DispatchStatus status = this->dispatch_request (request);
00513 switch (status)
00514 {
00515 case DISPATCH_SUCCESS:
00516 {
00517 request->complete ();
00518 request->release ();
00519 result = true;
00520 ace_mon.acquire ();
00521 break;
00522 }
00523 case DISPATCH_RETRY:
00524 {
00525 if (DEBUG_LEVEL > 0)
00526 ACE_DEBUG ((LM_DEBUG,
00527 ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00528 static_cast<int> (this->proxy ()->id ()),
00529 request->sequence ()
00530 ));
00531 ace_mon.acquire ();
00532 requests.enqueue_head (request);
00533 result = false;
00534 break;
00535 }
00536 case DISPATCH_DISCARD:
00537 {
00538 if (DEBUG_LEVEL > 0)
00539 ACE_DEBUG ((LM_DEBUG,
00540 ACE_TEXT ("(%P|%t) Consumer %d: Error during ")
00541 ACE_TEXT ("dispatch. Discarding event:%d.\n"),
00542 static_cast<int> (this->proxy ()->id ()),
00543 request->sequence ()
00544 ));
00545 request->complete ();
00546 ace_mon.acquire ();
00547 result = true;
00548 break;
00549 }
00550 case DISPATCH_FAIL:
00551 {
00552 if (DEBUG_LEVEL > 0)
00553 ACE_DEBUG ((LM_DEBUG,
00554 ACE_TEXT ("(%P|%t) Consumer %d: Failed. ")
00555 ACE_TEXT ("Discarding event %d.\n"),
00556 static_cast<int> (this->proxy ()->id ()),
00557 request->sequence ()
00558 ));
00559 request->complete ();
00560 ace_mon.acquire ();
00561 while (requests.dequeue_head (request) == 0)
00562 {
00563 ace_mon.release ();
00564 request->complete ();
00565 ace_mon.acquire ();
00566 }
00567 ace_mon.release ();
00568 try
00569 {
00570 this->proxy_supplier ()->destroy ();
00571 }
00572 catch (const CORBA::Exception&)
00573 {
00574
00575 }
00576 ace_mon.acquire ();
00577 result = true;
00578 break;
00579 }
00580 default:
00581 {
00582 ace_mon.acquire ();
00583 result = false;
00584 break;
00585 }
00586 }
00587 }
00588 return result;
00589 }
00590
00591
00592
00593 void
00594 TAO_Notify_Consumer::schedule_timer (bool is_error)
00595 {
00596 if (this->timer_id_ != -1)
00597 {
00598 return;
00599 }
00600
00601 if (this->is_suspended ())
00602 {
00603 return;
00604 }
00605
00606 ACE_ASSERT (this->timer_.get() != 0);
00607
00608
00609
00610
00611 ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);
00612
00613 if (! is_error)
00614 {
00615 if (this->pacing_.is_valid ())
00616 {
00617 tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
00618 }
00619 }
00620
00621 if (DEBUG_LEVEL > 5)
00622 {
00623 ACE_DEBUG ((LM_DEBUG,
00624 ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
00625 static_cast<int> (this->proxy ()->id ()), tv.msec ()));
00626 }
00627
00628 this->timer_id_ =
00629 this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero);
00630 if (this->timer_id_ == -1)
00631 {
00632 ACE_ERROR ((LM_ERROR,
00633 ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () ")
00634 ACE_TEXT ("Error scheduling timer.\n"),
00635 static_cast<int> (this->proxy ()->id ())
00636 ));
00637 }
00638 if (this->is_suspended())
00639 {
00640 this->cancel_timer();
00641 }
00642 }
00643
00644 void
00645 TAO_Notify_Consumer::cancel_timer (void)
00646 {
00647 if (this->timer_.isSet() && this->timer_id_ != -1)
00648 {
00649 if (DEBUG_LEVEL > 5)
00650 ACE_DEBUG ((LM_DEBUG,
00651 ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
00652 static_cast<int> (this->proxy ()->id ())
00653 ));
00654
00655 this->timer_->cancel_timer (timer_id_);
00656 }
00657 this->timer_id_ = -1;
00658 }
00659
00660 int
00661 TAO_Notify_Consumer::handle_timeout (const ACE_Time_Value&, const void*)
00662 {
00663 if (!this->is_suspended() && this->timer_.isSet() && this->timer_id_ != -1)
00664 {
00665 TAO_Notify_Consumer::Ptr grd (this);
00666 this->timer_id_ = -1;
00667 try
00668 {
00669 this->dispatch_pending ();
00670 }
00671 catch (...)
00672 {
00673 }
00674 }
00675
00676 return 0;
00677 }
00678
00679 void
00680 TAO_Notify_Consumer::shutdown (void)
00681 {
00682 this->suspend();
00683 if (this->timer_.isSet ())
00684 {
00685 this->cancel_timer ();
00686 this->timer_.reset ();
00687 }
00688 }
00689
00690 void
00691 TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed)
00692 {
00693 if (this->have_not_yet_verified_publish_)
00694 {
00695 this->have_not_yet_verified_publish_ = false;
00696 if (! this->publish_->_is_a ("IDL:omg.org/CosNotifyComm/NotifyPublish:1.0"))
00697 this->publish_ = CosNotifyComm::NotifyPublish::_nil();
00698 }
00699 if (! CORBA::is_nil (this->publish_.in ()))
00700 this->publish_->offer_change (added, removed);
00701 }
00702
00703 TAO_SYNCH_MUTEX*
00704 TAO_Notify_Consumer::proxy_lock (void)
00705 {
00706 return &this->proxy_->lock_;
00707 }
00708
00709 TAO_Notify_ProxySupplier*
00710 TAO_Notify_Consumer::proxy_supplier (void)
00711 {
00712 return this->proxy_;
00713 }
00714
00715 void
00716 TAO_Notify_Consumer::assume_pending_events (TAO_Notify_Consumer& rhs)
00717 {
00718
00719
00720
00721
00722 if (!rhs.pending_events ().is_empty ())
00723 {
00724
00725 this->pending_events_.reset (rhs.pending_events_.release ());
00726 if (rhs.timer_.isSet ())
00727 {
00728 rhs.cancel_timer ();
00729 }
00730
00731
00732
00733 this->schedule_timer ();
00734 }
00735 if (this->is_suspended())
00736 {
00737 this->cancel_timer();
00738 }
00739 }
00740
00741 bool
00742 TAO_Notify_Consumer::is_alive (bool allow_nil_consumer)
00743 {
00744 bool status = false;
00745 CORBA::Object_var consumer = this->get_consumer ();
00746 if (CORBA::is_nil (consumer.in ()))
00747 {
00748
00749
00750
00751
00752 if (allow_nil_consumer)
00753 return true;
00754 else
00755 return status;
00756 }
00757
00758 CORBA::PolicyList policy_list;
00759 try
00760 {
00761 bool do_liveliness_check = false;
00762 ACE_Time_Value now = ACE_OS::gettimeofday ();
00763
00764 if (CORBA::is_nil (this->rtt_obj_.in ()))
00765 {
00766
00767
00768
00769
00770
00771
00772
00773
00774 TimeBase::TimeT timeout = 10000000;
00775 CORBA::Any timeout_any;
00776 timeout_any <<= timeout;
00777
00778 policy_list.length (1);
00779 policy_list[0] = TAO_Notify_PROPERTIES::instance()->orb()->
00780 create_policy (
00781 Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
00782 timeout_any);
00783 rtt_obj_ =
00784 consumer->_set_policy_overrides (policy_list,
00785 CORBA::ADD_OVERRIDE);
00786
00787
00788 for (CORBA::ULong i = 0; i < policy_list.length (); i++)
00789 policy_list[i]->destroy ();
00790
00791 do_liveliness_check
00792 = (last_ping_ == ACE_Time_Value::zero ? true
00793 : now - last_ping_.value () >= TAO_Notify_PROPERTIES::instance()->validate_client_delay ());
00794 }
00795 else
00796 do_liveliness_check =
00797 now - last_ping_.value () >= TAO_Notify_PROPERTIES::instance()->validate_client_interval ();
00798
00799 if (CORBA::is_nil (rtt_obj_.in ()))
00800 status = false;
00801 else if (do_liveliness_check || allow_nil_consumer)
00802 {
00803 last_ping_ = now;
00804 status = !rtt_obj_->_non_existent ();
00805 }
00806 else
00807 status = true;
00808 }
00809 catch (CORBA::TIMEOUT&)
00810 {
00811 status = true;
00812 }
00813 catch (CORBA::Exception& ex)
00814 {
00815 if (DEBUG_LEVEL > 0)
00816 {
00817 ex._tao_print_exception ("TAO_Notify_Consumer::is_alive: false");
00818 }
00819 }
00820
00821 return status;
00822 }
00823
00824 TAO_END_VERSIONED_NAMESPACE_DECL