00001
00002
00003 #include "orbsvcs/Notify/Consumer.h"
00004
00005 #if ! defined (__ACE_INLINE__)
00006 #include "orbsvcs/Notify/Consumer.inl"
00007 #endif
00008
00009 ACE_RCSID (RT_Notify, TAO_Notify_Consumer, "$Id: Consumer.cpp 79324 2007-08-13 11:20:01Z elliott_c $")
00010
00011 #include "orbsvcs/Notify/Timer.h"
00012 #include "orbsvcs/Notify/ProxySupplier.h"
00013 #include "orbsvcs/Notify/Method_Request_Event.h"
00014
00015 #include "orbsvcs/Time_Utilities.h"
00016
00017 #include "tao/debug.h"
00018 #include "tao/corba.h"
00019
00020 #include "ace/Bound_Ptr.h"
00021 #include "ace/Unbounded_Queue.h"
00022
00023 #ifndef DEBUG_LEVEL
00024 # define DEBUG_LEVEL TAO_debug_level
00025 #endif //DEBUG_LEVEL
00026
00027 static const int DEFAULT_RETRY_TIMEOUT = 10;
00028
00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
00030
00031 TAO_Notify_Consumer::TAO_Notify_Consumer (TAO_Notify_ProxySupplier* proxy)
00032 : proxy_ (proxy)
00033 , is_suspended_ (0)
00034 , have_not_yet_verified_publish_ (true)
00035 , pacing_ (proxy->qos_properties_.pacing_interval ())
00036 , max_batch_size_ (CosNotification::MaximumBatchSize, 0)
00037 , timer_id_ (-1)
00038 , timer_ (0)
00039 {
00040 Request_Queue* pending_events = 0;
00041 ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ());
00042 this->pending_events_.reset( pending_events );
00043
00044 this->timer_.reset( this->proxy ()->timer () );
00045 }
00046
00047 TAO_Notify_Consumer::~TAO_Notify_Consumer ()
00048 {
00049 if (this->timer_.isSet())
00050 {
00051 this->cancel_timer ();
00052 this->timer_.reset ();
00053 }
00054 }
00055
00056 TAO_Notify_Proxy*
00057 TAO_Notify_Consumer::proxy (void)
00058 {
00059 return this->proxy_supplier ();
00060 }
00061
00062 void
00063 TAO_Notify_Consumer::qos_changed (const TAO_Notify_QoSProperties& qos_properties)
00064 {
00065 this->max_batch_size_ = qos_properties.maximum_batch_size ();
00066 }
00067
00068 void
00069 TAO_Notify_Consumer::resume (void)
00070 {
00071 this->is_suspended_ = 0;
00072
00073 this->dispatch_pending ();
00074 }
00075
00076 void
00077 TAO_Notify_Consumer::enqueue_request (
00078 TAO_Notify_Method_Request_Event * request)
00079 {
00080 TAO_Notify_Event::Ptr event (
00081 request->event ()->queueable_copy ());
00082
00083 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00084 ACE_NEW_THROW_EX (queue_entry,
00085 TAO_Notify_Method_Request_Event_Queueable (*request, event),
00086 CORBA::NO_MEMORY ());
00087
00088 if (DEBUG_LEVEL > 3) ACE_DEBUG ( (LM_DEBUG,
00089 ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
00090 static_cast<int> (this->proxy ()->id ()),
00091 request->sequence (),
00092 request
00093 ));
00094 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00095 this->pending_events().enqueue_tail (queue_entry);
00096 }
00097
00098 bool
00099 TAO_Notify_Consumer::enqueue_if_necessary (TAO_Notify_Method_Request_Event * request)
00100 {
00101 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
00102 if (! this->pending_events().is_empty ())
00103 {
00104 if (DEBUG_LEVEL > 3)
00105 ACE_DEBUG ((LM_DEBUG,
00106 ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
00107 static_cast<int> (this->proxy ()->id ()),
00108 request->sequence ()
00109 ));
00110 TAO_Notify_Event::Ptr event (
00111 request->event ()->queueable_copy ());
00112 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00113 ACE_NEW_THROW_EX (queue_entry,
00114 TAO_Notify_Method_Request_Event_Queueable (*request,
00115 event),
00116 CORBA::NO_MEMORY ());
00117 this->pending_events().enqueue_tail (queue_entry);
00118 this->schedule_timer (false);
00119 return true;
00120 }
00121 if (this->is_suspended_ == 1)
00122 {
00123 if (DEBUG_LEVEL > 3)
00124 ACE_DEBUG ((LM_DEBUG,
00125 ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
00126 static_cast<int> (this->proxy ()->id ()),
00127 request->sequence ()
00128 ));
00129 TAO_Notify_Event::Ptr event (
00130 request->event ()->queueable_copy ());
00131 TAO_Notify_Method_Request_Event_Queueable * queue_entry;
00132 ACE_NEW_THROW_EX (queue_entry,
00133 TAO_Notify_Method_Request_Event_Queueable (*request,
00134 event),
00135 CORBA::NO_MEMORY ());
00136 this->pending_events().enqueue_tail (queue_entry);
00137 this->schedule_timer (false);
00138 return true;
00139 }
00140 return false;
00141 }
00142
00143 void
00144 TAO_Notify_Consumer::deliver (TAO_Notify_Method_Request_Event * request)
00145 {
00146
00147
00148 TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ());
00149 bool queued = enqueue_if_necessary (request);
00150 if (!queued)
00151 {
00152 DispatchStatus status = this->dispatch_request (request);
00153 switch (status)
00154 {
00155 case DISPATCH_SUCCESS:
00156 {
00157 request->complete ();
00158 break;
00159 }
00160 case DISPATCH_RETRY:
00161 {
00162 if (DEBUG_LEVEL > 1)
00163 ACE_DEBUG ((LM_DEBUG,
00164 ACE_TEXT ("Consumer %d enqueing event %d due ")
00165 ACE_TEXT ("to failed dispatch.\n"),
00166 static_cast<int> (this->proxy ()->id ()),
00167 request->sequence ()));
00168 this->enqueue_request (request);
00169 this->schedule_timer (true);
00170 break;
00171 }
00172 case DISPATCH_DISCARD:
00173 {
00174 if (DEBUG_LEVEL > 0)
00175 ACE_DEBUG ((LM_DEBUG,
00176 ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00177 "direct dispatch. Discarding event:%d.\n"),
00178 static_cast<int> (this->proxy ()->id ()),
00179 request->sequence ()
00180 ));
00181 request->complete ();
00182 break;
00183 }
00184 case DISPATCH_FAIL:
00185 {
00186 if (DEBUG_LEVEL > 0)
00187 ACE_DEBUG ((LM_DEBUG,
00188 ACE_TEXT ("(%P|%t) Consumer %d: Failed during "
00189 "direct dispatch :%d. Discarding event.\n"),
00190 static_cast<int> (this->proxy ()->id ()),
00191 request->sequence ()
00192 ));
00193 request->complete ();
00194 try
00195 {
00196 this->proxy_supplier ()->destroy ();
00197 }
00198 catch (const CORBA::Exception&)
00199 {
00200
00201 ;
00202 }
00203 break;
00204 }
00205 }
00206 }
00207 }
00208
00209 TAO_Notify_Consumer::DispatchStatus
00210 TAO_Notify_Consumer::dispatch_request (TAO_Notify_Method_Request_Event * request)
00211 {
00212 DispatchStatus result = DISPATCH_SUCCESS;
00213 try
00214 {
00215 request->event ()->push (this);
00216 if (DEBUG_LEVEL > 8)
00217 ACE_DEBUG ((LM_DEBUG,
00218 ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
00219 static_cast<int> (this->proxy ()->id ()),
00220 request->sequence ()
00221 ));
00222 }
00223 catch (const CORBA::OBJECT_NOT_EXIST& ex)
00224 {
00225 if (DEBUG_LEVEL > 0)
00226 {
00227 ACE_DEBUG ((LM_ERROR,
00228 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00229 "(request) %s\n"),
00230 static_cast<int> (this->proxy ()->id ()),
00231 ex._info ().c_str ()
00232 ));
00233 }
00234 result = DISPATCH_FAIL;
00235 }
00236 catch (const CORBA::TRANSIENT& ex)
00237 {
00238 if (DEBUG_LEVEL > 0)
00239 ACE_DEBUG ((LM_ERROR,
00240 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00241 "(request) Transient (minor=%d) %s\n"),
00242 static_cast<int> (this->proxy ()->id ()),
00243 ex.minor (),
00244 ex._info ().c_str ()
00245 ));
00246 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00247 switch (ex.minor () & 0xfffff000u)
00248 {
00249 case CORBA::OMGVMCID:
00250 switch (ex.minor () & 0x00000fffu)
00251 {
00252 case 2:
00253 case 3:
00254 case 4:
00255 result = DISPATCH_FAIL;
00256 break;
00257 default:
00258 result = DISPATCH_DISCARD;
00259 }
00260 break;
00261
00262 case TAO::VMCID:
00263 default:
00264 switch (ex.minor () & BITS_5_THRU_12_MASK)
00265 {
00266 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00267 result = DISPATCH_FAIL;
00268 break;
00269 case TAO_POA_DISCARDING:
00270 case TAO_POA_HOLDING:
00271 default:
00272 result = DISPATCH_RETRY;
00273 } break;
00274 }
00275 }
00276 catch (const CORBA::TIMEOUT& ex)
00277 {
00278 if (DEBUG_LEVEL > 0)
00279 ACE_DEBUG ((LM_ERROR,
00280 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00281 "(request) %s\n"),
00282 this->proxy ()->id (),
00283 ex._info().c_str ()
00284 ));
00285 result = DISPATCH_FAIL;
00286 }
00287 catch (const CORBA::COMM_FAILURE& ex)
00288 {
00289 if (DEBUG_LEVEL > 0)
00290 ACE_DEBUG ((LM_ERROR,
00291 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push "
00292 "(request) %s\n"),
00293 this->proxy ()->id (),
00294 ex._info().c_str ()
00295 ));
00296 result = DISPATCH_FAIL;
00297 }
00298 catch (const CORBA::SystemException& ex)
00299 {
00300 if (DEBUG_LEVEL > 0)
00301 {
00302 ACE_DEBUG ((LM_ERROR,
00303 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00304 "(request) SystemException %s\n"),
00305 static_cast<int> (this->proxy ()->id ()),
00306 ex._info ().c_str ()
00307 ));
00308 }
00309 result = DISPATCH_DISCARD;
00310 }
00311 catch (const CORBA::Exception&)
00312 {
00313 ACE_ERROR ( (LM_ERROR,
00314 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push "
00315 "(request) Caught unexpected exception "
00316 "pushing event to consumer.\n"),
00317 static_cast<int> (this->proxy ()->id ())
00318 ));
00319 result = DISPATCH_DISCARD;
00320 }
00321
00322
00323
00324
00325 if (result == DISPATCH_FAIL || result == DISPATCH_DISCARD)
00326 {
00327 if (request->should_retry ())
00328 {
00329 result = DISPATCH_RETRY;
00330 }
00331 }
00332 else if (result == DISPATCH_RETRY)
00333 {
00334 if (! request->should_retry ())
00335 {
00336 result = DISPATCH_DISCARD;
00337 }
00338 }
00339
00340 return result;
00341 }
00342
00343 TAO_Notify_Consumer::DispatchStatus
00344 TAO_Notify_Consumer::dispatch_batch (const CosNotification::EventBatch& batch)
00345 {
00346 DispatchStatus result = DISPATCH_SUCCESS;
00347 try
00348 {
00349 this->push (batch);
00350 }
00351 catch (const CORBA::OBJECT_NOT_EXIST& ex)
00352 {
00353 if (DEBUG_LEVEL > 0)
00354 ACE_DEBUG ((LM_ERROR,
00355 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00356 "%d::dispatch_batch() %s\n"),
00357 static_cast<int> (this->proxy ()->id ()),
00358 ex._info ().c_str ()
00359 ));
00360 result = DISPATCH_FAIL;
00361 }
00362 catch (const CORBA::TRANSIENT& ex)
00363 {
00364 if (DEBUG_LEVEL > 0)
00365 ACE_DEBUG ((LM_ERROR,
00366 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00367 "%d::dispatch_batch() Transient (minor=%d) %s\n"),
00368 static_cast<int> (this->proxy ()->id ()),
00369 ex.minor (),
00370 ex._info ().c_str ()
00371 ));
00372 const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
00373 switch (ex.minor () & 0xfffff000u)
00374 {
00375 case CORBA::OMGVMCID:
00376 switch (ex.minor () & 0x00000fffu)
00377 {
00378 case 2:
00379 case 3:
00380 case 4:
00381 result = DISPATCH_FAIL;
00382 break;
00383 default:
00384 result = DISPATCH_DISCARD;
00385 }
00386 break;
00387
00388 case TAO::VMCID:
00389 default:
00390 switch (ex.minor () & BITS_5_THRU_12_MASK)
00391 {
00392 case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
00393 result = DISPATCH_FAIL;
00394 break;
00395 case TAO_POA_DISCARDING:
00396 case TAO_POA_HOLDING:
00397 default:
00398 result = DISPATCH_RETRY;
00399 } break;
00400 }
00401 }
00402 catch (const CORBA::TIMEOUT& ex)
00403 {
00404 if (DEBUG_LEVEL > 0)
00405 ACE_DEBUG ((LM_ERROR,
00406 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00407 "%u::dispatch_batch() %s\n"),
00408 this->proxy ()->id (),
00409 ex._info().c_str ()
00410 ));
00411 result = DISPATCH_FAIL;
00412 }
00413 catch (const CORBA::COMM_FAILURE& ex)
00414 {
00415 if (DEBUG_LEVEL > 0)
00416 ACE_DEBUG ((LM_ERROR,
00417 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00418 "%u::dispatch_batch() %s\n"),
00419 this->proxy ()->id (),
00420 ex._info().c_str ()
00421 ));
00422 result = DISPATCH_FAIL;
00423 }
00424 catch (const CORBA::SystemException& ex)
00425 {
00426 if (DEBUG_LEVEL > 0)
00427 {
00428 ACE_DEBUG ((LM_ERROR,
00429 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00430 "%d::dispatch_batch() SystemException %s\n"),
00431 static_cast<int> (this->proxy ()->id ()),
00432 ex._info ().c_str ()
00433 ));
00434 }
00435 result = DISPATCH_DISCARD;
00436 }
00437 catch (const CORBA::Exception&)
00438 {
00439 ACE_ERROR ((LM_ERROR,
00440 ACE_TEXT ("(%P|%t) TAO_Notify_Consumer "
00441 "%d::dispatch_batch() Caught unexpected "
00442 "exception pushing batch to consumer.\n"),
00443 static_cast<int> (this->proxy ()->id ())
00444 ));
00445 result = DISPATCH_DISCARD;
00446 }
00447 return result;
00448 }
00449
00450 void
00451 TAO_Notify_Consumer::dispatch_pending (void)
00452 {
00453 if (DEBUG_LEVEL > 5)
00454 ACE_DEBUG ((LM_DEBUG,
00455 ACE_TEXT ("Consumer %d dispatching pending events. Queue size: %d\n"),
00456 static_cast<int> (this->proxy ()->id ()),
00457 this->pending_events().size ()
00458 ));
00459
00460
00461 TAO_Notify_Consumer::Ptr self_grd (this);
00462
00463
00464 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
00465 bool ok = true;
00466 while (ok
00467 && !this->proxy_supplier ()->has_shutdown ()
00468 && !this->pending_events().is_empty ())
00469 {
00470 if (! dispatch_from_queue ( this->pending_events(), ace_mon))
00471 {
00472 this->schedule_timer (true);
00473 ok = false;
00474 }
00475 }
00476 }
00477
00478
00479
00480 bool
00481 TAO_Notify_Consumer::dispatch_from_queue (Request_Queue & requests, ACE_Guard <TAO_SYNCH_MUTEX> & ace_mon)
00482 {
00483 bool result = true;
00484 TAO_Notify_Method_Request_Event_Queueable * request;
00485 if (requests.dequeue_head (request) == 0)
00486 {
00487 ace_mon.release ();
00488 DispatchStatus status = this->dispatch_request (request);
00489 switch (status)
00490 {
00491 case DISPATCH_SUCCESS:
00492 {
00493 request->complete ();
00494 request->release ();
00495 result = true;
00496 ace_mon.acquire ();
00497 break;
00498 }
00499 case DISPATCH_RETRY:
00500 {
00501 if (DEBUG_LEVEL > 0)
00502 ACE_DEBUG ((LM_DEBUG,
00503 ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
00504 static_cast<int> (this->proxy ()->id ()),
00505 request->sequence ()
00506 ));
00507 ace_mon.acquire ();
00508 requests.enqueue_head (request);
00509 result = false;
00510 break;
00511 }
00512 case DISPATCH_DISCARD:
00513 {
00514 if (DEBUG_LEVEL > 0)
00515 ACE_DEBUG ((LM_DEBUG,
00516 ACE_TEXT ("(%P|%t) Consumer %d: Error during "
00517 "dispatch. Discarding event:%d.\n"),
00518 static_cast<int> (this->proxy ()->id ()),
00519 request->sequence ()
00520 ));
00521 request->complete ();
00522 ace_mon.acquire ();
00523 result = true;
00524 break;
00525 }
00526 case DISPATCH_FAIL:
00527 {
00528 if (DEBUG_LEVEL > 0)
00529 ACE_DEBUG ((LM_DEBUG,
00530 ACE_TEXT ("(%P|%t) Consumer %d: Failed. "
00531 "Discarding event %d.\n"),
00532 static_cast<int> (this->proxy ()->id ()),
00533 request->sequence ()
00534 ));
00535 request->complete ();
00536 ace_mon.acquire ();
00537 while (requests.dequeue_head (request) == 0)
00538 {
00539 ace_mon.release ();
00540 request->complete ();
00541 ace_mon.acquire ();
00542 }
00543 ace_mon.release ();
00544 try
00545 {
00546 this->proxy_supplier ()->destroy ();
00547 }
00548 catch (const CORBA::Exception&)
00549 {
00550
00551 }
00552 ace_mon.acquire ();
00553 result = true;
00554 break;
00555 }
00556 default:
00557 {
00558 ace_mon.acquire ();
00559 result = false;
00560 break;
00561 }
00562 }
00563 }
00564 return result;
00565 }
00566
00567
00568
00569 void
00570 TAO_Notify_Consumer::schedule_timer (bool is_error)
00571 {
00572 if (this->timer_id_ != -1)
00573 {
00574 return;
00575 }
00576
00577 if (this->is_suspended ())
00578 {
00579 return;
00580 }
00581
00582 ACE_ASSERT (this->timer_.get() != 0);
00583
00584
00585
00586
00587 ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);
00588
00589 if (! is_error)
00590 {
00591 if (this->pacing_.is_valid ())
00592 {
00593 tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
00594 }
00595 }
00596
00597 if (DEBUG_LEVEL > 5)
00598 {
00599 ACE_DEBUG ((LM_DEBUG,
00600 ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
00601 static_cast<int> (this->proxy ()->id ()), tv.msec ()));
00602 }
00603
00604 this->timer_id_ =
00605 this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero);
00606 if (this->timer_id_ == -1)
00607 {
00608 ACE_ERROR ((LM_ERROR,
00609 ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () "
00610 "Error scheduling timer.\n"),
00611 static_cast<int> (this->proxy ()->id ())
00612 ));
00613 }
00614 }
00615
00616 void
00617 TAO_Notify_Consumer::cancel_timer (void)
00618 {
00619 if (this->timer_.isSet() && this->timer_id_ != -1)
00620 {
00621 if (DEBUG_LEVEL > 5)
00622 ACE_DEBUG ((LM_DEBUG,
00623 ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
00624 static_cast<int> (this->proxy ()->id ())
00625 ));
00626
00627 this->timer_->cancel_timer (timer_id_);
00628 }
00629 this->timer_id_ = -1;
00630 }
00631
00632 int
00633 TAO_Notify_Consumer::handle_timeout (const ACE_Time_Value&, const void*)
00634 {
00635 TAO_Notify_Consumer::Ptr grd (this);
00636 this->timer_id_ = -1;
00637 try
00638 {
00639 this->dispatch_pending ();
00640 }
00641 catch (...)
00642 {
00643 }
00644
00645 return 0;
00646 }
00647
00648 void
00649 TAO_Notify_Consumer::shutdown (void)
00650 {
00651 if (this->timer_.isSet ())
00652 {
00653 this->cancel_timer ();
00654 this->timer_.reset ();
00655 }
00656 }
00657
00658 void
00659 TAO_Notify_Consumer::dispatch_updates_i (const CosNotification::EventTypeSeq& added, const CosNotification::EventTypeSeq& removed)
00660 {
00661 if (this->have_not_yet_verified_publish_)
00662 {
00663 this->have_not_yet_verified_publish_ = false;
00664 if (! this->publish_->_is_a ("IDL:omg.org/CosNotifyComm/NotifyPublish:1.0"))
00665 this->publish_ = CosNotifyComm::NotifyPublish::_nil();
00666 }
00667 if (! CORBA::is_nil (this->publish_.in ()))
00668 this->publish_->offer_change (added, removed);
00669 }
00670
00671 TAO_SYNCH_MUTEX*
00672 TAO_Notify_Consumer::proxy_lock (void)
00673 {
00674 return &this->proxy_->lock_;
00675 }
00676
00677 TAO_Notify_ProxySupplier*
00678 TAO_Notify_Consumer::proxy_supplier (void)
00679 {
00680 return this->proxy_;
00681 }
00682
00683 void
00684 TAO_Notify_Consumer::assume_pending_events (TAO_Notify_Consumer& rhs)
00685 {
00686
00687
00688
00689
00690 if (!rhs.pending_events ().is_empty ())
00691 {
00692
00693 this->pending_events_.reset (rhs.pending_events_.release ());
00694 if (rhs.timer_.isSet ())
00695 {
00696 rhs.cancel_timer ();
00697 }
00698
00699
00700
00701 this->schedule_timer ();
00702 }
00703 }
00704
00705 TAO_END_VERSIONED_NAMESPACE_DECL