#include <Routing_Slip.h>
Inheritance diagram for TAO_Notify::Routing_Slip:
Interacts with persistent storage to provide reliable delivery.
Definition at line 66 of file Routing_Slip.h.
|
|
A mini-state machine to control persistence See external doc for circles and arrows.
Definition at line 183 of file Routing_Slip.h.
00184 { 00185 rssCREATING, 00186 rssTRANSIENT, 00187 rssRELOADED, 00188 rssNEW, 00189 rssCOMPLETE_WHILE_NEW, 00190 rssSAVING, 00191 rssSAVED, 00192 rssUPDATING, 00193 rssCHANGED_WHILE_SAVING, 00194 rssCHANGED, 00195 rssCOMPLETE, 00196 rssDELETING, 00197 rssTERMINAL 00198 } state_; |
|
destructor (should be private but that inspires compiler wars)
Definition at line 185 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, and LM_DEBUG.
00186 { 00187 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00188 ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"), 00189 this->sequence_ 00190 )); 00191 } |
|
Private constructor for use by create method.
Definition at line 167 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, routing_slip_sequence_, sequence_, and sequence_lock_.
00169 : is_safe_ (false) 00170 , until_safe_ (internals_) 00171 , this_ptr_ (0) 00172 , event_(event) 00173 , state_ (rssCREATING) 00174 , complete_requests_ (0) 00175 , rspm_ (0) 00176 { 00177 Routing_Slip_Guard guard (sequence_lock_); 00178 this->sequence_ = ++routing_slip_sequence_; 00179 if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG, 00180 ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"), 00181 this->sequence_ 00182 )); 00183 } |
|
This routing_slip needs to be saved.
Definition at line 582 of file Routing_Slip.cpp. References TAO_Notify::Routing_Slip_Queue::add(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), and Routing_Slip_Guard. Referenced by enter_state_changed(), and enter_state_new().
00583 { 00584 guard.release (); 00585 this->persistent_queue_.add (this->this_ptr_); 00586 } |
|
Test to see if all deliveries are complete.
Definition at line 576 of file Routing_Slip.cpp. References complete_requests_, delivery_requests_, and ACE_Vector< T, DEFAULT_SIZE >::size(). Referenced by continue_state_changed(), continue_state_new(), continue_state_transient(), enter_state_changed(), and enter_state_transient().
00577 { 00578 return this->complete_requests_ == this->delivery_requests_.size (); 00579 } |
|
This Routing_Slip reached the front of the persistence queue.
Definition at line 453 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_deleting(), enter_state_saving(), enter_state_terminal(), enter_state_updating(), LM_DEBUG, LM_ERROR, persistent_queue_, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCHANGED, rssCOMPLETE, rssCOMPLETE_WHILE_NEW, and rssNEW.
00454 { 00455 Routing_Slip_Guard guard (this->internals_); 00456 State state = this->state_; 00457 switch (state) 00458 { 00459 case rssNEW: 00460 { 00461 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00462 ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"), 00463 this->sequence_ 00464 )); 00465 enter_state_saving (guard); 00466 break; 00467 } 00468 case rssCOMPLETE_WHILE_NEW: 00469 { 00470 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00471 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"), 00472 this->sequence_ 00473 )); 00474 guard.release (); 00475 this->persistent_queue_.complete (); 00476 enter_state_terminal (guard); 00477 break; 00478 } 00479 case rssCHANGED: 00480 { 00481 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00482 ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"), 00483 this->sequence_ 00484 )); 00485 enter_state_updating (guard); 00486 break; 00487 } 00488 case rssCOMPLETE: 00489 { 00490 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00491 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"), 00492 this->sequence_ 00493 )); 00494 enter_state_deleting (guard); 00495 break; 00496 } 00497 default: 00498 { 00499 ACE_ERROR ((LM_ERROR, 00500 ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"), 00501 this->sequence_, 00502 static_cast<int> (this->state_) 00503 )); 00504 break; 00505 } 00506 } 00507 } |
|
Definition at line 783 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_changed_, enter_state_complete(), ACE_Guard< ACE_LOCK >::release(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00784 { 00785 ++count_continue_changed_; 00786 if (all_deliveries_complete ()) 00787 { 00788 enter_state_complete (guard); 00789 } 00790 else 00791 { 00792 guard.release (); 00793 } 00794 } |
|
Definition at line 758 of file Routing_Slip.cpp. References ACE_Guard< ACE_LOCK >::release(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00759 {
00760 // no action necessary
00761 guard.release ();
00762 }
|
|
Definition at line 604 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_new_, enter_state_complete_while_new(), ACE_Guard< ACE_LOCK >::release(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00605 { 00606 ++count_continue_new_; 00607 if (all_deliveries_complete ()) 00608 { 00609 this->enter_state_complete_while_new (guard); 00610 } 00611 guard.release (); 00612 } |
|
Definition at line 668 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_transient_, enter_state_terminal(), ACE_Guard< ACE_LOCK >::release(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00669 { 00670 ++count_continue_transient_; 00671 if (all_deliveries_complete ()) 00672 { 00673 enter_state_terminal (guard); 00674 } 00675 else 00676 { 00677 guard.release (); 00678 } 00679 } |
|
"Factory" method for use during reload from persistent storage.
Definition at line 108 of file Routing_Slip.cpp. References ACE_ERROR, ACE_TEXT(), create(), event(), LM_ERROR, TAO_Notify::Routing_Slip_Persistence_Manager::reload(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), and TAO_Notify::Routing_Slip_Ptr.
00111 { 00112 Routing_Slip_Ptr result; 00113 ACE_Message_Block * event_mb = 0; 00114 ACE_Message_Block * rs_mb = 0; 00115 try 00116 { 00117 if (rspm->reload (event_mb, rs_mb)) 00118 { 00119 TAO_InputCDR cdr_event (event_mb); 00120 TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event)); 00121 if (event.isSet()) 00122 { 00123 result = create (event); 00124 TAO_InputCDR cdr_rs (rs_mb); 00125 if ( result->unmarshal (ecf, cdr_rs)) 00126 { 00127 result->set_rspm (rspm); 00128 } 00129 else 00130 { 00131 ACE_ERROR ((LM_ERROR, 00132 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n") 00133 )); 00134 result.reset (); 00135 } 00136 } 00137 else 00138 { 00139 ACE_ERROR ((LM_ERROR, 00140 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n") 00141 )); 00142 } 00143 } 00144 } 00145 catch (const CORBA::Exception&) 00146 { 00147 ACE_ERROR ((LM_ERROR, 00148 ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n") 00149 )); 00150 } 00151 delete event_mb; 00152 delete rs_mb; 00153 00154 return result; 00155 } |
|
"Factory" method for normal use.
Definition at line 57 of file Routing_Slip.cpp. References ACE_ERROR, ACE_NEW_THROW_EX, ACE_TEXT(), count_continue_changed_, count_continue_changed_while_saving_, count_continue_new_, count_continue_transient_, count_enter_changed_, count_enter_changed_while_saving_, count_enter_complete_, count_enter_complete_while_new_, count_enter_deleting_, count_enter_new_, count_enter_reloaded_, count_enter_saved_, count_enter_saving_, count_enter_terminal_, count_enter_transient_, count_enter_updating_, DEBUG_LEVEL, LM_ERROR, and TAO_Notify::Routing_Slip_Ptr. Referenced by create(), TAO_Notify_EventChannelFactory::load_event_persistence(), and TAO_Notify_ProxyConsumer::push_i().
00058 { 00059 Routing_Slip * prs; 00060 ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ()); 00061 Routing_Slip_Ptr result(prs); 00062 result->this_ptr_ = result; // let the pointers touch so they use the same ref count 00063 00064 // note we don't care about ultra-precise stats, so no guard for these 00065 if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0)) 00066 { 00067 ACE_ERROR ((LM_ERROR, 00068 ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n") 00069 ACE_TEXT (" enter_transient \t%d\n") 00070 ACE_TEXT (" continue_transient \t%d\n") 00071 ACE_TEXT (" enter_reloaded \t%d\n") 00072 ACE_TEXT (" enter_new \t%d\n") 00073 ACE_TEXT (" continue_new \t%d\n") 00074 ACE_TEXT (" enter_complete_while_new \t%d\n") 00075 ACE_TEXT (" enter_saving \t%d\n") 00076 ACE_TEXT (" enter_saved \t%d\n") 00077 ACE_TEXT (" enter_updating \t%d\n") 00078 ACE_TEXT (" enter_changed_while_saving \t%d\n") 00079 ACE_TEXT (" continue_changed_while_saving\t%d\n") 00080 ACE_TEXT (" enter_changed \t%d\n") 00081 ACE_TEXT (" continue_changed \t%d\n") 00082 ACE_TEXT (" enter_complete \t%d\n") 00083 ACE_TEXT (" enter_deleting \t%d\n") 00084 ACE_TEXT (" enter_terminal \t%d\n") 00085 , static_cast<int> (count_enter_transient_) 00086 , static_cast<int> (count_continue_transient_) 00087 , static_cast<int> (count_enter_reloaded_) 00088 , static_cast<int> (count_enter_new_) 00089 , static_cast<int> (count_continue_new_) 00090 , static_cast<int> (count_enter_complete_while_new_) 00091 , static_cast<int> (count_enter_saving_) 00092 , static_cast<int> (count_enter_saved_) 00093 , static_cast<int> (count_enter_updating_) 00094 , static_cast<int> (count_enter_changed_while_saving_) 00095 , static_cast<int> (count_continue_changed_while_saving_) 00096 , static_cast<int> (count_enter_changed_) 00097 , static_cast<int> (count_continue_changed_) 00098 , static_cast<int> (count_enter_complete_) 00099 , static_cast<int> (count_enter_deleting_) 00100 , static_cast<int> (count_enter_terminal_) 00101 )); 00102 } 00103 return result; 00104 } |
|
Definition at line 194 of file Routing_Slip.cpp. References TAO_Notify::Event_Persistence_Factory::create_routing_slip_persistence_manager(), TAO_Notify::Event_Persistence_Strategy::get_factory(), rspm_, and set_rspm(). Referenced by enter_state_saving().
00195 { 00196 if (this->rspm_ == 0) 00197 { 00198 Event_Persistence_Strategy * strategy = 00199 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence"); 00200 if (strategy != 0) 00201 { 00202 Event_Persistence_Factory * factory = strategy->get_factory (); 00203 if (factory != 0) 00204 { 00205 set_rspm (factory->create_routing_slip_persistence_manager(this)); 00206 } 00207 } 00208 } 00209 return this->rspm_ != 0; 00210 } |
|
A delivery request has been satisfied.
Definition at line 388 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), complete_requests_, continue_state_changed(), continue_state_changed_while_saving(), continue_state_new(), continue_state_transient(), DEBUG_LEVEL, delivery_requests_, enter_state_changed(), enter_state_changed_while_saving(), LM_DEBUG, LM_ERROR, Routing_Slip_Guard, rssCHANGED, rssCHANGED_WHILE_SAVING, rssNEW, rssSAVED, rssSAVING, rssTRANSIENT, rssUPDATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00389 { 00390 Routing_Slip_Guard guard (this->internals_); 00391 ACE_ASSERT (request_id < this->delivery_requests_.size ()); 00392 // reset the pointer to allow the delivery_request to be deleted. 00393 this->delivery_requests_[request_id].reset (); 00394 this->complete_requests_ += 1; 00395 00396 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00397 ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"), 00398 this->sequence_, 00399 static_cast<int> (request_id), 00400 static_cast<int> (this->complete_requests_), 00401 static_cast<int> (this->delivery_requests_.size ()) 00402 )); 00403 State state = this->state_; 00404 switch (state) 00405 { 00406 case rssTRANSIENT: 00407 { 00408 continue_state_transient (guard); 00409 break; 00410 } 00411 case rssNEW: 00412 { 00413 continue_state_new (guard); 00414 break; 00415 } 00416 case rssSAVING: 00417 { 00418 enter_state_changed_while_saving (guard); 00419 break; 00420 } 00421 case rssUPDATING: 00422 { 00423 enter_state_changed_while_saving (guard); 00424 break; 00425 } 00426 case rssSAVED: 00427 { 00428 enter_state_changed (guard); 00429 break; 00430 } 00431 case rssCHANGED_WHILE_SAVING: 00432 { 00433 continue_state_changed_while_saving (guard); 00434 break; 00435 } 00436 case rssCHANGED: 00437 { 00438 continue_state_changed (guard); 00439 break; 00440 } 00441 default: 00442 { 00443 ACE_ERROR ((LM_ERROR, 00444 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"), 00445 static_cast<int> (this->state_) 00446 )); 00447 break; 00448 } 00449 } 00450 } |
|
Schedule delivery to a consumer via a proxy supplier.
Definition at line 336 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, TAO_Notify_Object::execute_task(), TAO_Notify_Object::has_shutdown(), TAO_Notify_Object::id(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00339 { 00340 // cannot be the first action 00341 ACE_ASSERT (this->state_ != rssCREATING); 00342 00343 TAO_Notify_ProxySupplier::Ptr psgrd(ps); 00344 Routing_Slip_Guard guard (this->internals_); 00345 00346 size_t request_id = delivery_requests_.size (); 00347 00348 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00349 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"), 00350 this->sequence_, 00351 static_cast<int> (request_id), 00352 filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"), 00353 static_cast<int> (this->complete_requests_), 00354 static_cast<int> (this->delivery_requests_.size ()) 00355 )); 00356 00357 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); 00358 if (! ps->has_shutdown() ) 00359 { 00360 this->delivery_requests_.push_back (request); 00361 TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter); 00362 guard.release (); 00363 if (DEBUG_LEVEL > 8) 00364 ACE_DEBUG ((LM_DEBUG, 00365 "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to " 00366 "proxy supplier %d\n", 00367 this->sequence_, 00368 static_cast<int> (request_id), 00369 ps->id())); 00370 ps->execute_task (method); 00371 } 00372 else 00373 { 00374 if (DEBUG_LEVEL > 5) 00375 ACE_DEBUG ((LM_DEBUG, 00376 "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to " 00377 "proxy supplier %d; already shut down\n", 00378 this->sequence_, 00379 static_cast<int> (request_id), 00380 ps->id())); 00381 } 00382 } |
|
Definition at line 765 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), add_to_persist_queue(), all_deliveries_complete(), count_enter_changed_, DEBUG_LEVEL, enter_state_complete(), LM_DEBUG, Routing_Slip_Guard, and rssCHANGED. Referenced by delivery_request_complete(), and persist_complete().
00766 { 00767 ++count_enter_changed_; 00768 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00769 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"), 00770 this->sequence_ 00771 )); 00772 // complete state change BEFORE initiating request to avoid 00773 // race condition if request finishes before state is stable. 00774 this->state_ = rssCHANGED; 00775 if (all_deliveries_complete ()) 00776 { 00777 enter_state_complete (guard); 00778 } 00779 add_to_persist_queue (guard); 00780 } |
|
Definition at line 746 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_changed_while_saving_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, and rssCHANGED_WHILE_SAVING. Referenced by delivery_request_complete().
00747 { 00748 ++count_enter_changed_while_saving_; 00749 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00750 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"), 00751 this->sequence_ 00752 )); 00753 this->state_ = rssCHANGED_WHILE_SAVING; 00754 guard.release (); 00755 } |
|
Definition at line 797 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_complete_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, and rssCOMPLETE. Referenced by continue_state_changed(), and enter_state_changed().
00798 { 00799 ++count_enter_complete_; 00800 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00801 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"), 00802 this->sequence_ 00803 )); 00804 this->state_ = rssCOMPLETE; 00805 guard.release (); 00806 } |
|
Definition at line 614 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_complete_while_new_, DEBUG_LEVEL, is_safe_, LM_DEBUG, Routing_Slip_Guard, rssCOMPLETE_WHILE_NEW, and until_safe_. Referenced by continue_state_new().
00615 { 00616 ++count_enter_complete_while_new_; 00617 ACE_UNUSED_ARG (guard); 00618 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00619 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"), 00620 this->sequence_ 00621 )); 00622 // allow the ConsumerProxy to return from the CORBA push call. 00623 if (! is_safe_) 00624 { 00625 is_safe_ = true; 00626 this->until_safe_.signal (); 00627 } 00628 this->state_ = rssCOMPLETE_WHILE_NEW; 00629 } |
|
Definition at line 809 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_deleting_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), TAO_Notify::Routing_Slip_Persistence_Manager::remove(), Routing_Slip_Guard, rspm_, and rssDELETING. Referenced by at_front_of_persist_queue().
00810 { 00811 ++count_enter_deleting_; 00812 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00813 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"), 00814 this->sequence_ 00815 )); 00816 this->state_ = rssDELETING; 00817 guard.release (); 00818 this->rspm_->remove (); 00819 } |
|
Definition at line 592 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), add_to_persist_queue(), count_enter_new_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssNEW. Referenced by route().
00593 { 00594 ++count_enter_new_; 00595 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00596 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"), 00597 this->sequence_ 00598 )); 00599 this->state_ = rssNEW; 00600 add_to_persist_queue(guard); 00601 } |
|
Definition at line 632 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_reloaded_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, and rssRELOADED.
00633 { 00634 ++count_enter_reloaded_; 00635 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00636 ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"), 00637 this->sequence_ 00638 )); 00639 this->state_ = rssRELOADED; 00640 guard.release(); 00641 } |
|
Definition at line 714 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_saved_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, and rssSAVED. Referenced by persist_complete(), and reconnect().
00715 { 00716 ++count_enter_saved_; 00717 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00718 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"), 00719 this->sequence_ 00720 )); 00721 this->state_ = rssSAVED; 00722 guard.release (); 00723 } |
|
Definition at line 681 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), ACE_OutputCDR::begin(), TAO_Notify::Routing_Slip_Queue::complete(), count_enter_saving_, create_persistence_manager(), DEBUG_LEVEL, enter_state_transient(), LM_DEBUG, marshal(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rspm_, rssSAVING, and TAO_Notify::Routing_Slip_Persistence_Manager::store(). Referenced by at_front_of_persist_queue().
00682 { 00683 ++count_enter_saving_; 00684 if (!create_persistence_manager ()) 00685 { 00686 // Note This should actually be a throw (out of memory) 00687 // but we cheat and make this a transient event. 00688 guard.release (); 00689 this->persistent_queue_.complete (); 00690 enter_state_transient (guard); 00691 } 00692 else 00693 { 00694 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00695 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"), 00696 this->sequence_ 00697 )); 00698 this->state_ = rssSAVING; 00699 00700 TAO_OutputCDR event_cdr; 00701 this->event_->marshal (event_cdr); 00702 00703 const ACE_Message_Block *event_mb = event_cdr.begin (); 00704 TAO_OutputCDR rs_cdr; 00705 marshal (rs_cdr); 00706 const ACE_Message_Block *rs_mb = rs_cdr.begin (); 00707 00708 guard.release (); 00709 this->rspm_->store (*event_mb, *rs_mb); 00710 } 00711 } |
|
Definition at line 822 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), count_enter_terminal_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), Routing_Slip_Guard, rssTERMINAL, and this_ptr_. Referenced by at_front_of_persist_queue(), continue_state_transient(), enter_state_transient(), and persist_complete().
00823 { 00824 ++count_enter_terminal_; 00825 ACE_ASSERT( this->is_safe_); 00826 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00827 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"), 00828 this->sequence_ 00829 )); 00830 this->state_ = rssTERMINAL; 00831 this->this_ptr_.reset (); 00832 guard.release (); 00833 } |
|
Definition at line 644 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), all_deliveries_complete(), count_enter_transient_, DEBUG_LEVEL, enter_state_terminal(), is_safe_, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssTRANSIENT, and until_safe_. Referenced by enter_state_saving(), and route().
00645 { 00646 ++count_enter_transient_; 00647 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00648 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"), 00649 this->sequence_ 00650 )); 00651 this->state_ = rssTRANSIENT; 00652 if (! is_safe_) 00653 { 00654 is_safe_ = true; 00655 this->until_safe_.signal (); 00656 } 00657 if (all_deliveries_complete ()) 00658 { 00659 enter_state_terminal (guard); 00660 } 00661 else 00662 { 00663 guard.release (); 00664 } 00665 } |
|
Definition at line 726 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), ACE_OutputCDR::begin(), count_enter_updating_, DEBUG_LEVEL, LM_DEBUG, marshal(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rspm_, rssUPDATING, and TAO_Notify::Routing_Slip_Persistence_Manager::update(). Referenced by at_front_of_persist_queue().
00727 { 00728 ++count_enter_updating_; 00729 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00730 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"), 00731 this->sequence_ 00732 )); 00733 this->state_ = rssUPDATING; 00734 00735 TAO_OutputCDR rs_cdr; 00736 marshal (rs_cdr); 00737 const ACE_Message_Block *rs_mb = rs_cdr.begin (); 00738 guard.release (); 00739 00740 ACE_ASSERT (this->rspm_ != 0); 00741 this->rspm_->update (*rs_mb); 00742 } |
|
Definition at line 213 of file Routing_Slip.cpp. Referenced by create().
00214 { 00215 return this->event_; 00216 } |
|
Marshal into a CDR.
Definition at line 836 of file Routing_Slip.cpp. References delivery_requests_, ACE_Array_Base< T >::get(), TAO_Notify::Delivery_Request::marshal(), ACE_Vector< T, DEFAULT_SIZE >::size(), and ACE_OutputCDR::write_ulong(). Referenced by enter_state_saving(), and enter_state_updating().
00837 { 00838 size_t request_count = this->delivery_requests_.size(); 00839 cdr.write_ulong ( 00840 ACE_Utils::truncate_cast<CORBA::ULong> (request_count - this->complete_requests_)); 00841 for (size_t nreq = 0; nreq < request_count; ++nreq) 00842 { 00843 Delivery_Request * request = this->delivery_requests_[nreq].get (); 00844 if (request != 0) 00845 { 00846 request->marshal (cdr); 00847 } 00848 } 00849 } |
|
The persistent storage has completed the last request.
Implements TAO_Notify::Persistent_Callback. Definition at line 510 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_changed(), enter_state_saved(), enter_state_terminal(), is_safe_, LM_DEBUG, LM_ERROR, ACE_Guard< ACE_LOCK >::locked(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, TAO_Notify::Routing_Slip_Ptr, rssCHANGED_WHILE_SAVING, rssDELETING, rssSAVING, rssUPDATING, and until_safe_.
00511 { 00512 // keep this object around til this method returns. 00513 Routing_Slip_Ptr me(this->this_ptr_); 00514 Routing_Slip_Guard guard (this->internals_); 00515 ACE_ASSERT (guard.locked ()); 00516 00517 // allow the ConsumerProxy to return from the CORBA push call. 00518 if (! is_safe_) 00519 { 00520 is_safe_ = true; 00521 this->until_safe_.signal (); 00522 } 00523 00524 State state = this->state_; 00525 switch (state) 00526 { 00527 case rssSAVING: 00528 { 00529 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00530 ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"), 00531 this->sequence_ 00532 )); 00533 enter_state_saved(guard); 00534 break; 00535 } 00536 case rssCHANGED_WHILE_SAVING: 00537 { 00538 enter_state_changed (guard); 00539 break; 00540 } 00541 case rssUPDATING: 00542 { 00543 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00544 ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"), 00545 this->sequence_ 00546 )); 00547 enter_state_saved (guard); 00548 break; 00549 } 00550 case rssDELETING: 00551 { 00552 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00553 ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"), 00554 this->sequence_ 00555 )); 00556 enter_state_terminal (guard); 00557 break; 00558 } 00559 default: 00560 { 00561 ACE_ERROR ((LM_ERROR, 00562 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"), 00563 static_cast<int> (this->state_) 00564 )); 00565 guard.release (); 00566 break; 00567 } 00568 } 00569 this->persistent_queue_.complete (); 00570 } |
|
Definition at line 908 of file Routing_Slip.cpp. References ACE_Vector< T, DEFAULT_SIZE >::clear(), delivery_methods_, enter_state_saved(), Routing_Slip_Guard, and ACE_Vector< T, DEFAULT_SIZE >::size().
00909 { 00910 Routing_Slip_Guard guard (this->internals_); 00911 enter_state_saved (guard); 00912 00913 //@@todo is there a worker_task available to do this? 00914 size_t count = this->delivery_methods_.size (); 00915 for (size_t nmethod = 0; nmethod < count; ++nmethod) 00916 { 00917 this->delivery_methods_[nmethod]->execute (); 00918 } 00919 this->delivery_methods_.clear (); 00920 } |
|
Route this event to destinations must be the Action request after the routing slip is created. Definition at line 229 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, enter_state_new(), enter_state_transient(), TAO_Notify_Object::execute_task(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00230 { 00231 ACE_ASSERT(pc != 0); 00232 00233 TAO_Notify_ProxyConsumer::Ptr pcgrd(pc); 00234 00235 Routing_Slip_Guard guard (this->internals_); 00236 00237 size_t request_id = delivery_requests_.size (); 00238 00239 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00240 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"), 00241 this->sequence_, 00242 static_cast<int> (request_id), 00243 static_cast<int> (this->complete_requests_), 00244 static_cast<int> (this->delivery_requests_.size ()) 00245 )); 00246 00247 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); 00248 this->delivery_requests_.push_back (request); 00249 TAO_Notify_Method_Request_Lookup_Queueable method (request, pc); 00250 00251 if (this->state_ == rssCREATING) 00252 { 00253 if (! reliable_channel) 00254 { 00255 enter_state_transient (guard); 00256 } 00257 else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0) 00258 { 00259 enter_state_transient (guard); 00260 } 00261 else if (! this->event_->reliable().is_valid()) 00262 { 00263 enter_state_new (guard); 00264 } 00265 else if (this->event_->reliable().value() == true) 00266 { 00267 enter_state_new (guard); 00268 } 00269 else 00270 { 00271 enter_state_transient (guard); 00272 } 00273 } 00274 else 00275 { 00276 // We only need to release the guard if the state is rssCREATING. 00277 // By calling enter_state_*, we are guaranteed that the guard has 00278 // been released. 00279 guard.release (); 00280 } 00281 pc->execute_task (method); 00282 } |
|
Provide an identifying number for this Routing Slip to use in debug messages.
Definition at line 923 of file Routing_Slip.cpp. References sequence_.
00924 { 00925 return this->sequence_; 00926 } |
|
Definition at line 158 of file Routing_Slip.cpp. References rspm_, and TAO_Notify::Routing_Slip_Persistence_Manager::set_callback(). Referenced by create_persistence_manager().
|
|
Should delivery of this event be retried if it fails?
Definition at line 929 of file Routing_Slip.cpp. References rssTRANSIENT.
00930 { 00931 // simple minded test: if it's transient, don't retry it 00932 // @@todo Eventually this should check timeout, discard policy, etc. 00933 return this->state_ != rssTRANSIENT; 00934 } |
|
Marshal from CDR.
Definition at line 852 of file Routing_Slip.cpp. References ACE_NEW_THROW_EX, delivery_methods_, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_InputCDR::read_octet(), ACE_InputCDR::read_ulong(), ACE_Vector< T, DEFAULT_SIZE >::size(), this_ptr_, TAO_Notify_Method_Request_Lookup::unmarshal(), and TAO_Notify_Method_Request_Dispatch::unmarshal().
00853 { 00854 CORBA::ULong count = 0; 00855 cdr.read_ulong (count); 00856 for (size_t nreq = 0; nreq < count; ++nreq) 00857 { 00858 ACE_CDR::Octet code = 0; 00859 while (cdr.read_octet(code)) 00860 { 00861 try 00862 { 00863 if (code == TAO_Notify_Method_Request_Dispatch::persistence_code) 00864 { 00865 Delivery_Request * prequest; 00866 ACE_NEW_THROW_EX ( 00867 prequest, 00868 Delivery_Request(this_ptr_, this->delivery_requests_.size ()), 00869 CORBA::NO_MEMORY ()); 00870 Delivery_Request_Ptr request(prequest); 00871 TAO_Notify_Method_Request_Dispatch_Queueable * method = 00872 TAO_Notify_Method_Request_Dispatch::unmarshal ( 00873 request, 00874 ecf, 00875 cdr); 00876 if (method != 0) 00877 { 00878 this->delivery_requests_.push_back (request); 00879 this->delivery_methods_.push_back (method); 00880 } 00881 } 00882 else if (code == TAO_Notify_Method_Request_Lookup::persistence_code) 00883 { 00884 Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ())); 00885 TAO_Notify_Method_Request_Lookup_Queueable * method = 00886 TAO_Notify_Method_Request_Lookup::unmarshal ( 00887 request, 00888 ecf, 00889 cdr); 00890 if (method != 0) 00891 { 00892 this->delivery_requests_.push_back (request); 00893 this->delivery_methods_.push_back (method); 00894 } 00895 } 00896 } 00897 catch (const CORBA::Exception&) 00898 { 00899 // @@todo should we log this? 00900 // just ignore failures 00901 } 00902 } 00903 } 00904 return this->delivery_requests_.size () > 0; 00905 } |
|
Wait until the event/routing_slip has been saved at least once.
Definition at line 219 of file Routing_Slip.cpp. References is_safe_, Routing_Slip_Guard, and until_safe_.
00220 { 00221 Routing_Slip_Guard guard (this->internals_); 00222 while (!this->is_safe_) 00223 { 00224 this->until_safe_.wait (); 00225 } 00226 } |
|
How many delivery requests are complete.
Definition at line 207 of file Routing_Slip.h. Referenced by all_deliveries_complete(), and delivery_request_complete(). |
|
Definition at line 51 of file Routing_Slip.cpp. Referenced by continue_state_changed(), and create(). |
|
Definition at line 49 of file Routing_Slip.cpp. Referenced by create(). |
|
Definition at line 43 of file Routing_Slip.cpp. Referenced by continue_state_new(), and create(). |
|
Definition at line 40 of file Routing_Slip.cpp. Referenced by continue_state_transient(), and create(). |
|
Definition at line 50 of file Routing_Slip.cpp. Referenced by create(), and enter_state_changed(). |
|
Definition at line 48 of file Routing_Slip.cpp. Referenced by create(), and enter_state_changed_while_saving(). |
|
Definition at line 52 of file Routing_Slip.cpp. Referenced by create(), and enter_state_complete(). |
|
Definition at line 44 of file Routing_Slip.cpp. Referenced by create(), and enter_state_complete_while_new(). |
|
Definition at line 53 of file Routing_Slip.cpp. Referenced by create(), and enter_state_deleting(). |
|
Definition at line 42 of file Routing_Slip.cpp. Referenced by create(), and enter_state_new(). |
|
Definition at line 41 of file Routing_Slip.cpp. Referenced by create(), and enter_state_reloaded(). |
|
Definition at line 46 of file Routing_Slip.cpp. Referenced by create(), and enter_state_saved(). |
|
Definition at line 45 of file Routing_Slip.cpp. Referenced by create(), and enter_state_saving(). |
|
Definition at line 54 of file Routing_Slip.cpp. Referenced by create(), and enter_state_terminal(). |
|
Definition at line 39 of file Routing_Slip.cpp. Referenced by create(), and enter_state_transient(). |
|
Definition at line 47 of file Routing_Slip.cpp. Referenced by create(), and enter_state_updating(). |
|
Methods that should be restarted during event recovery.
Definition at line 204 of file Routing_Slip.h. Referenced by reconnect(), and unmarshal(). |
|
A collection of delivery requests.
Definition at line 201 of file Routing_Slip.h. Referenced by all_deliveries_complete(), delivery_request_complete(), dispatch(), marshal(), route(), and unmarshal(). |
|
Definition at line 179 of file Routing_Slip.h. |
|
Protection for internal information.
Definition at line 167 of file Routing_Slip.h. |
|
true when event persistence qos is guaranteed
Definition at line 169 of file Routing_Slip.h. Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist(). |
|
Referenced by add_to_persist_queue(), at_front_of_persist_queue(), enter_state_saving(), and persist_complete(). |
|
Definition at line 38 of file Routing_Slip.cpp. Referenced by Routing_Slip(). |
|
Pointer to a Routing_Slip_Persistence_Manager.
Definition at line 210 of file Routing_Slip.h. Referenced by create_persistence_manager(), enter_state_deleting(), enter_state_saving(), enter_state_updating(), and set_rspm(). |
|
Definition at line 212 of file Routing_Slip.h. Referenced by Routing_Slip(), and sequence(). |
|
Definition at line 37 of file Routing_Slip.cpp. Referenced by Routing_Slip(). |
|
A mini-state machine to control persistence See external doc for circles and arrows. |
|
Smart pointer to this object Provides continuity between smart pointers and "Routing_Slip::this" Also lets the Routing_Slip manage its own minimum lifetime. Definition at line 176 of file Routing_Slip.h. Referenced by enter_state_terminal(), and unmarshal(). |
|
signalled when is_safe_ goes true
Definition at line 171 of file Routing_Slip.h. Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist(). |