#include <Routing_Slip.h>
Inheritance diagram for TAO_Notify::Routing_Slip:
Interacts with persistent storage to provide reliable delivery.
Definition at line 66 of file Routing_Slip.h.
|
|
A mini-state machine to control persistence See external doc for circles and arrows.
Definition at line 184 of file Routing_Slip.h.
00185 { 00186 rssCREATING, 00187 rssTRANSIENT, 00188 rssRELOADED, 00189 rssNEW, 00190 rssCOMPLETE_WHILE_NEW, 00191 rssSAVING, 00192 rssSAVED, 00193 rssUPDATING, 00194 rssCHANGED_WHILE_SAVING, 00195 rssCHANGED, 00196 rssCOMPLETE, 00197 rssDELETING, 00198 rssTERMINAL 00199 } state_; |
|
destructor (should be private but that inspires compiler wars)
Definition at line 188 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, and LM_DEBUG.
00189 { 00190 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00191 ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"), 00192 this->sequence_ 00193 )); 00194 } |
|
Private constructor for use by create method.
Definition at line 170 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, routing_slip_sequence_, sequence_, and sequence_lock_.
00172 : is_safe_ (false) 00173 , until_safe_ (internals_) 00174 , this_ptr_ (0) 00175 , event_(event) 00176 , state_ (rssCREATING) 00177 , complete_requests_ (0) 00178 , rspm_ (0) 00179 { 00180 Routing_Slip_Guard guard (sequence_lock_); 00181 this->sequence_ = ++routing_slip_sequence_; 00182 if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG, 00183 ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"), 00184 this->sequence_ 00185 )); 00186 } |
|
This routing_slip needs to be saved.
Definition at line 578 of file Routing_Slip.cpp. References ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify::Routing_Slip_Queue::add(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), and Routing_Slip_Guard. Referenced by enter_state_changed(), and enter_state_new().
00579 { 00580 guard.release (); 00581 this->persistent_queue_.add (this->this_ptr_); 00582 guard.acquire (); // necessary? 00583 } |
|
Test to see if all deliveries are complete.
Definition at line 572 of file Routing_Slip.cpp. References complete_requests_, delivery_requests_, and ACE_Vector< T, DEFAULT_SIZE >::size(). Referenced by continue_state_changed(), continue_state_new(), continue_state_transient(), enter_state_changed(), and enter_state_transient().
00573 { 00574 return this->complete_requests_ == this->delivery_requests_.size (); 00575 } |
|
This Routing_Slip reached the front of the persistence queue.
Definition at line 451 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_deleting(), enter_state_saving(), enter_state_terminal(), enter_state_updating(), LM_DEBUG, LM_ERROR, persistent_queue_, Routing_Slip_Guard, rssCHANGED, rssCOMPLETE, rssCOMPLETE_WHILE_NEW, and rssNEW.
00452 { 00453 Routing_Slip_Guard guard (this->internals_); 00454 State state = this->state_; 00455 switch (state) 00456 { 00457 case rssNEW: 00458 { 00459 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00460 ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"), 00461 this->sequence_ 00462 )); 00463 enter_state_saving (guard); 00464 break; 00465 } 00466 case rssCOMPLETE_WHILE_NEW: 00467 { 00468 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00469 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"), 00470 this->sequence_ 00471 )); 00472 this->persistent_queue_.complete (); 00473 enter_state_terminal (guard); 00474 break; 00475 } 00476 case rssCHANGED: 00477 { 00478 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00479 ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"), 00480 this->sequence_ 00481 )); 00482 enter_state_updating (guard); 00483 break; 00484 } 00485 case rssCOMPLETE: 00486 { 00487 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00488 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"), 00489 this->sequence_ 00490 )); 00491 enter_state_deleting (guard); 00492 break; 00493 } 00494 default: 00495 { 00496 ACE_ERROR ((LM_ERROR, 00497 ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"), 00498 this->sequence_, 00499 static_cast<int> (this->state_) 00500 )); 00501 break; 00502 } 00503 } 00504 } |
|
Definition at line 774 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_changed_, enter_state_complete(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00775 { 00776 ++count_continue_changed_; 00777 if (all_deliveries_complete ()) 00778 { 00779 enter_state_complete (guard); 00780 } 00781 } |
|
Definition at line 749 of file Routing_Slip.cpp. References Routing_Slip_Guard. Referenced by delivery_request_complete().
00750 {
00751 ACE_UNUSED_ARG (guard);
00752 // no action necessary
00753 }
|
|
Definition at line 601 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_new_, enter_state_complete_while_new(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00602 { 00603 ++count_continue_new_; 00604 if (all_deliveries_complete ()) 00605 { 00606 this->enter_state_complete_while_new (guard); 00607 } 00608 } |
|
Definition at line 661 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_transient_, enter_state_terminal(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00662 { 00663 ++count_continue_transient_; 00664 if (all_deliveries_complete ()) 00665 { 00666 enter_state_terminal (guard); 00667 } 00668 } |
|
"Factory" method for use during reload from persistent storage.
Definition at line 108 of file Routing_Slip.cpp. References ACE_CATCHANY, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, create(), event(), LM_ERROR, TAO_Notify::Routing_Slip_Persistence_Manager::reload(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), and TAO_Notify::Routing_Slip_Ptr.
00111 { 00112 Routing_Slip_Ptr result; 00113 ACE_Message_Block * event_mb = 0; 00114 ACE_Message_Block * rs_mb = 0; 00115 ACE_DECLARE_NEW_ENV; 00116 ACE_TRY 00117 { 00118 if (rspm->reload (event_mb, rs_mb)) 00119 { 00120 TAO_InputCDR cdr_event (event_mb); 00121 TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event)); 00122 if (event.isSet()) 00123 { 00124 result = create (event ACE_ENV_ARG_PARAMETER); 00125 ACE_TRY_CHECK; 00126 TAO_InputCDR cdr_rs (rs_mb); 00127 if ( result->unmarshal (ecf, cdr_rs)) 00128 { 00129 result->set_rspm (rspm); 00130 } 00131 else 00132 { 00133 ACE_ERROR ((LM_ERROR, 00134 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n") 00135 )); 00136 result.reset (); 00137 } 00138 } 00139 else 00140 { 00141 ACE_ERROR ((LM_ERROR, 00142 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n") 00143 )); 00144 } 00145 } 00146 } 00147 ACE_CATCHANY 00148 { 00149 ACE_ERROR ((LM_ERROR, 00150 ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n") 00151 )); 00152 } 00153 ACE_ENDTRY; 00154 delete event_mb; 00155 delete rs_mb; 00156 00157 return result; 00158 } |
|
"Factory" method for normal use.
Definition at line 56 of file Routing_Slip.cpp. References ACE_CHECK_RETURN, ACE_ERROR, ACE_NEW_THROW_EX, ACE_TEXT(), count_continue_changed_, count_continue_changed_while_saving_, count_continue_new_, count_continue_transient_, count_enter_changed_, count_enter_changed_while_saving_, count_enter_complete_, count_enter_complete_while_new_, count_enter_deleting_, count_enter_new_, count_enter_reloaded_, count_enter_saved_, count_enter_saving_, count_enter_terminal_, count_enter_transient_, count_enter_updating_, DEBUG_LEVEL, LM_ERROR, and TAO_Notify::Routing_Slip_Ptr. Referenced by create(), TAO_Notify_EventChannelFactory::load_event_persistence(), and TAO_Notify_ProxyConsumer::push_i().
00057 { 00058 Routing_Slip * prs; 00059 ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ()); 00060 ACE_CHECK_RETURN (Routing_Slip_Ptr()); 00061 Routing_Slip_Ptr result(prs); 00062 result->this_ptr_ = result; // let the pointers touch so they use the same ref count 00063 00064 // note we don't care about ultra-precise stats, so no guard for these 00065 if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0)) 00066 { 00067 ACE_ERROR ((LM_ERROR, 00068 ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n") 00069 ACE_TEXT (" enter_transient \t%d\n") 00070 ACE_TEXT (" continue_transient \t%d\n") 00071 ACE_TEXT (" enter_reloaded \t%d\n") 00072 ACE_TEXT (" enter_new \t%d\n") 00073 ACE_TEXT (" continue_new \t%d\n") 00074 ACE_TEXT (" enter_complete_while_new \t%d\n") 00075 ACE_TEXT (" enter_saving \t%d\n") 00076 ACE_TEXT (" enter_saved \t%d\n") 00077 ACE_TEXT (" enter_updating \t%d\n") 00078 ACE_TEXT (" enter_changed_while_saving \t%d\n") 00079 ACE_TEXT (" continue_changed_while_saving\t%d\n") 00080 ACE_TEXT (" enter_changed \t%d\n") 00081 ACE_TEXT (" continue_changed \t%d\n") 00082 ACE_TEXT (" enter_complete \t%d\n") 00083 ACE_TEXT (" enter_deleting \t%d\n") 00084 ACE_TEXT (" enter_terminal \t%d\n") 00085 , static_cast<int> (count_enter_transient_) 00086 , static_cast<int> (count_continue_transient_) 00087 , static_cast<int> (count_enter_reloaded_) 00088 , static_cast<int> (count_enter_new_) 00089 , static_cast<int> (count_continue_new_) 00090 , static_cast<int> (count_enter_complete_while_new_) 00091 , static_cast<int> (count_enter_saving_) 00092 , static_cast<int> (count_enter_saved_) 00093 , static_cast<int> (count_enter_updating_) 00094 , static_cast<int> (count_enter_changed_while_saving_) 00095 , static_cast<int> (count_continue_changed_while_saving_) 00096 , static_cast<int> (count_enter_changed_) 00097 , static_cast<int> (count_continue_changed_) 00098 , static_cast<int> (count_enter_complete_) 00099 , static_cast<int> (count_enter_deleting_) 00100 , static_cast<int> (count_enter_terminal_) 00101 )); 00102 } 00103 return result; 00104 } |
|
Definition at line 197 of file Routing_Slip.cpp. References TAO_Notify::Event_Persistence_Factory::create_routing_slip_persistence_manager(), TAO_Notify::Event_Persistence_Strategy::get_factory(), rspm_, and set_rspm(). Referenced by enter_state_saving().
00198 { 00199 if (this->rspm_ == 0) 00200 { 00201 Event_Persistence_Strategy * strategy = 00202 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence"); 00203 if (strategy != 0) 00204 { 00205 Event_Persistence_Factory * factory = strategy->get_factory (); 00206 if (factory != 0) 00207 { 00208 set_rspm (factory->create_routing_slip_persistence_manager(this)); 00209 } 00210 } 00211 } 00212 return this->rspm_ != 0; 00213 } |
|
A delivery request has been satisfied.
Definition at line 386 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), complete_requests_, continue_state_changed(), continue_state_changed_while_saving(), continue_state_new(), continue_state_transient(), DEBUG_LEVEL, delivery_requests_, enter_state_changed(), enter_state_changed_while_saving(), LM_DEBUG, LM_ERROR, Routing_Slip_Guard, rssCHANGED, rssCHANGED_WHILE_SAVING, rssNEW, rssSAVED, rssSAVING, rssTRANSIENT, rssUPDATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00387 { 00388 Routing_Slip_Guard guard (this->internals_); 00389 ACE_ASSERT (request_id < this->delivery_requests_.size ()); 00390 // reset the pointer to allow the delivery_request to be deleted. 00391 this->delivery_requests_[request_id].reset (); 00392 this->complete_requests_ += 1; 00393 00394 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00395 ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"), 00396 this->sequence_, 00397 static_cast<int> (request_id), 00398 static_cast<int> (this->complete_requests_), 00399 static_cast<int> (this->delivery_requests_.size ()) 00400 )); 00401 State state = this->state_; 00402 switch (state) 00403 { 00404 case rssTRANSIENT: 00405 { 00406 continue_state_transient (guard); 00407 break; 00408 } 00409 case rssNEW: 00410 { 00411 continue_state_new (guard); 00412 break; 00413 } 00414 case rssSAVING: 00415 { 00416 enter_state_changed_while_saving (guard); 00417 break; 00418 } 00419 case rssUPDATING: 00420 { 00421 enter_state_changed_while_saving (guard); 00422 break; 00423 } 00424 case rssSAVED: 00425 { 00426 enter_state_changed (guard); 00427 break; 00428 } 00429 case rssCHANGED_WHILE_SAVING: 00430 { 00431 continue_state_changed_while_saving (guard); 00432 break; 00433 } 00434 case rssCHANGED: 00435 { 00436 continue_state_changed (guard); 00437 break; 00438 } 00439 default: 00440 { 00441 ACE_ERROR ((LM_ERROR, 00442 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"), 00443 static_cast<int> (this->state_) 00444 )); 00445 break; 00446 } 00447 } 00448 } |
|
Schedule delivery to a consumer via a proxy supplier.
Definition at line 332 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_CHECK, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_TEXT(), DEBUG_LEVEL, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, TAO_Notify_Object::execute_task(), TAO_Notify_Object::has_shutdown(), TAO_Notify_Object::id(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00336 { 00337 // cannot be the first action 00338 ACE_ASSERT (this->state_ != rssCREATING); 00339 00340 TAO_Notify_ProxySupplier::Ptr psgrd(ps); 00341 Routing_Slip_Guard guard (this->internals_); 00342 00343 size_t request_id = delivery_requests_.size (); 00344 00345 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00346 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"), 00347 this->sequence_, 00348 static_cast<int> (request_id), 00349 filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"), 00350 static_cast<int> (this->complete_requests_), 00351 static_cast<int> (this->delivery_requests_.size ()) 00352 )); 00353 00354 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); 00355 if (! ps->has_shutdown() ) 00356 { 00357 this->delivery_requests_.push_back (request); 00358 TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter); 00359 guard.release (); 00360 if (DEBUG_LEVEL > 8) 00361 ACE_DEBUG ((LM_DEBUG, 00362 "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to " 00363 "proxy supplier %d\n", 00364 this->sequence_, 00365 static_cast<int> (request_id), 00366 ps->id())); 00367 ps->execute_task (method ACE_ENV_ARG_PARAMETER); 00368 ACE_CHECK; 00369 } 00370 else 00371 { 00372 if (DEBUG_LEVEL > 5) 00373 ACE_DEBUG ((LM_DEBUG, 00374 "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to " 00375 "proxy supplier %d; already shut down\n", 00376 this->sequence_, 00377 static_cast<int> (request_id), 00378 ps->id())); 00379 } 00380 } |
|
Definition at line 756 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), add_to_persist_queue(), all_deliveries_complete(), count_enter_changed_, DEBUG_LEVEL, enter_state_complete(), LM_DEBUG, Routing_Slip_Guard, and rssCHANGED. Referenced by delivery_request_complete(), and persist_complete().
00757 { 00758 ++count_enter_changed_; 00759 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00760 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"), 00761 this->sequence_ 00762 )); 00763 // complete state change BEFORE initiating request to avoid 00764 // race condition if request finishes before state is stable. 00765 this->state_ = rssCHANGED; 00766 if (all_deliveries_complete ()) 00767 { 00768 enter_state_complete (guard); 00769 } 00770 add_to_persist_queue (guard); 00771 } |
|
Definition at line 737 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_changed_while_saving_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssCHANGED_WHILE_SAVING. Referenced by delivery_request_complete().
00738 { 00739 ++count_enter_changed_while_saving_; 00740 ACE_UNUSED_ARG (guard); 00741 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00742 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"), 00743 this->sequence_ 00744 )); 00745 this->state_ = rssCHANGED_WHILE_SAVING; 00746 } |
|
Definition at line 784 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_complete_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssCOMPLETE. Referenced by continue_state_changed(), and enter_state_changed().
00785 { 00786 ++count_enter_complete_; 00787 ACE_UNUSED_ARG (guard); 00788 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00789 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"), 00790 this->sequence_ 00791 )); 00792 this->state_ = rssCOMPLETE; 00793 } |
|
Definition at line 610 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_complete_while_new_, DEBUG_LEVEL, is_safe_, LM_DEBUG, Routing_Slip_Guard, rssCOMPLETE_WHILE_NEW, and until_safe_. Referenced by continue_state_new().
00611 { 00612 ++count_enter_complete_while_new_; 00613 ACE_UNUSED_ARG (guard); 00614 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00615 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"), 00616 this->sequence_ 00617 )); 00618 // allow the ConsumerProxy to return from the CORBA push call. 00619 if (! is_safe_) 00620 { 00621 is_safe_ = true; 00622 this->until_safe_.signal (); 00623 } 00624 this->state_ = rssCOMPLETE_WHILE_NEW; 00625 } |
|
Definition at line 796 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), count_enter_deleting_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), TAO_Notify::Routing_Slip_Persistence_Manager::remove(), Routing_Slip_Guard, rspm_, and rssDELETING. Referenced by at_front_of_persist_queue().
00797 { 00798 ++count_enter_deleting_; 00799 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00800 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"), 00801 this->sequence_ 00802 )); 00803 this->state_ = rssDELETING; 00804 guard.release (); 00805 this->rspm_->remove (); 00806 guard.acquire (); // necessary? 00807 } |
|
Definition at line 589 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), add_to_persist_queue(), count_enter_new_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssNEW. Referenced by route().
00590 { 00591 ++count_enter_new_; 00592 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00593 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"), 00594 this->sequence_ 00595 )); 00596 this->state_ = rssNEW; 00597 add_to_persist_queue(guard); 00598 } |
|
Definition at line 628 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_reloaded_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssRELOADED.
00629 { 00630 ++count_enter_reloaded_; 00631 ACE_UNUSED_ARG (guard); 00632 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00633 ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"), 00634 this->sequence_ 00635 )); 00636 this->state_ = rssRELOADED; 00637 } |
|
Definition at line 704 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_saved_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssSAVED. Referenced by persist_complete(), and reconnect().
00705 { 00706 ++count_enter_saved_; 00707 ACE_UNUSED_ARG (guard); 00708 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00709 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"), 00710 this->sequence_ 00711 )); 00712 this->state_ = rssSAVED; 00713 } |
|
Definition at line 670 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), ACE_OutputCDR::begin(), TAO_Notify::Routing_Slip_Queue::complete(), count_enter_saving_, create_persistence_manager(), DEBUG_LEVEL, enter_state_transient(), LM_DEBUG, marshal(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rspm_, rssSAVING, and TAO_Notify::Routing_Slip_Persistence_Manager::store(). Referenced by at_front_of_persist_queue().
00671 { 00672 ++count_enter_saving_; 00673 if (!create_persistence_manager ()) 00674 { 00675 // Note This should actually be a throw (out of memory) 00676 // but we cheat and make this a transient event. 00677 this->persistent_queue_.complete (); 00678 enter_state_transient (guard); 00679 } 00680 else 00681 { 00682 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00683 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"), 00684 this->sequence_ 00685 )); 00686 this->state_ = rssSAVING; 00687 00688 TAO_OutputCDR event_cdr; 00689 this->event_->marshal (event_cdr); 00690 00691 const ACE_Message_Block *event_mb = event_cdr.begin (); 00692 TAO_OutputCDR rs_cdr; 00693 marshal (rs_cdr); 00694 const ACE_Message_Block *rs_mb = rs_cdr.begin (); 00695 00696 guard.release (); 00697 this->rspm_->store (*event_mb, *rs_mb); 00698 00699 guard.acquire (); // necessary? 00700 } 00701 } |
|
Definition at line 810 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), count_enter_terminal_, DEBUG_LEVEL, LM_DEBUG, ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), Routing_Slip_Guard, rssTERMINAL, and this_ptr_. Referenced by at_front_of_persist_queue(), continue_state_transient(), enter_state_transient(), and persist_complete().
00811 { 00812 ++count_enter_terminal_; 00813 ACE_UNUSED_ARG (guard); 00814 ACE_ASSERT( this->is_safe_); 00815 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00816 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"), 00817 this->sequence_ 00818 )); 00819 this->state_ = rssTERMINAL; 00820 this->this_ptr_.reset (); 00821 } |
|
Definition at line 640 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), all_deliveries_complete(), count_enter_transient_, DEBUG_LEVEL, enter_state_terminal(), is_safe_, LM_DEBUG, Routing_Slip_Guard, rssTRANSIENT, and until_safe_. Referenced by enter_state_saving(), and route().
00641 { 00642 ++count_enter_transient_; 00643 ACE_UNUSED_ARG (guard); 00644 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00645 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"), 00646 this->sequence_ 00647 )); 00648 this->state_ = rssTRANSIENT; 00649 if (! is_safe_) 00650 { 00651 is_safe_ = true; 00652 this->until_safe_.signal (); 00653 } 00654 if (all_deliveries_complete ()) 00655 { 00656 enter_state_terminal (guard); 00657 } 00658 } |
|
Definition at line 716 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), ACE_OutputCDR::begin(), count_enter_updating_, DEBUG_LEVEL, LM_DEBUG, marshal(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rspm_, rssUPDATING, and TAO_Notify::Routing_Slip_Persistence_Manager::update(). Referenced by at_front_of_persist_queue().
00717 { 00718 ++count_enter_updating_; 00719 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00720 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"), 00721 this->sequence_ 00722 )); 00723 this->state_ = rssUPDATING; 00724 00725 TAO_OutputCDR rs_cdr; 00726 marshal (rs_cdr); 00727 const ACE_Message_Block *rs_mb = rs_cdr.begin (); 00728 guard.release (); 00729 00730 ACE_ASSERT (this->rspm_ != 0); 00731 this->rspm_->update (*rs_mb); 00732 guard.acquire (); // necessary? 00733 } |
|
Definition at line 216 of file Routing_Slip.cpp. Referenced by create().
00217 { 00218 return this->event_; 00219 } |
|
Marshal into a CDR.
Definition at line 824 of file Routing_Slip.cpp. References delivery_requests_, ACE_Array_Base< T >::get(), TAO_Notify::Delivery_Request::marshal(), ACE_Vector< T, DEFAULT_SIZE >::size(), and ACE_OutputCDR::write_ulong(). Referenced by enter_state_saving(), and enter_state_updating().
00825 { 00826 size_t request_count = this->delivery_requests_.size(); 00827 cdr.write_ulong (request_count - this->complete_requests_); 00828 for (size_t nreq = 0; nreq < request_count; ++nreq) 00829 { 00830 Delivery_Request * request = this->delivery_requests_[nreq].get (); 00831 if (request != 0) 00832 { 00833 request->marshal (cdr); 00834 } 00835 } 00836 } |
|
The persistent storage has completed the last request.
Implements TAO_Notify::Persistent_Callback. Definition at line 507 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_changed(), enter_state_saved(), enter_state_terminal(), is_safe_, LM_DEBUG, LM_ERROR, ACE_Guard< ACE_LOCK >::locked(), persistent_queue_, Routing_Slip_Guard, TAO_Notify::Routing_Slip_Ptr, rssCHANGED_WHILE_SAVING, rssDELETING, rssSAVING, rssUPDATING, and until_safe_.
00508 { 00509 // keep this object around til this method returns. 00510 Routing_Slip_Ptr me(this->this_ptr_); 00511 Routing_Slip_Guard guard (this->internals_); 00512 ACE_ASSERT (guard.locked ()); 00513 00514 // allow the ConsumerProxy to return from the CORBA push call. 00515 if (! is_safe_) 00516 { 00517 is_safe_ = true; 00518 this->until_safe_.signal (); 00519 } 00520 00521 State state = this->state_; 00522 switch (state) 00523 { 00524 case rssSAVING: 00525 { 00526 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00527 ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"), 00528 this->sequence_ 00529 )); 00530 enter_state_saved(guard); 00531 break; 00532 } 00533 case rssCHANGED_WHILE_SAVING: 00534 { 00535 enter_state_changed (guard); 00536 break; 00537 } 00538 case rssUPDATING: 00539 { 00540 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00541 ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"), 00542 this->sequence_ 00543 )); 00544 enter_state_saved (guard); 00545 break; 00546 } 00547 case rssDELETING: 00548 { 00549 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00550 ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"), 00551 this->sequence_ 00552 )); 00553 enter_state_terminal (guard); 00554 break; 00555 } 00556 default: 00557 { 00558 ACE_ERROR ((LM_ERROR, 00559 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"), 00560 static_cast<int> (this->state_) 00561 )); 00562 break; 00563 } 00564 } 00565 this->persistent_queue_.complete (); 00566 } |
|
Definition at line 902 of file Routing_Slip.cpp. References ACE_CHECK, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_Vector< T, DEFAULT_SIZE >::clear(), delivery_methods_, enter_state_saved(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, and ACE_Vector< T, DEFAULT_SIZE >::size().
00903 { 00904 Routing_Slip_Guard guard (this->internals_); 00905 enter_state_saved (guard); 00906 guard.release (); 00907 //@@todo is there a worker_task available to do this? 00908 size_t count = this->delivery_methods_.size (); 00909 for (size_t nmethod = 0; nmethod < count; ++nmethod) 00910 { 00911 this->delivery_methods_[nmethod]->execute (ACE_ENV_SINGLE_ARG_PARAMETER); 00912 ACE_CHECK; 00913 } 00914 this->delivery_methods_.clear (); 00915 } |
|
Route this event to destinations must be the Action request after the routing slip is created. Definition at line 232 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_TEXT(), DEBUG_LEVEL, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, enter_state_new(), enter_state_transient(), TAO_Notify_Object::execute_task(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00233 { 00234 ACE_ASSERT(pc != 0); 00235 00236 TAO_Notify_ProxyConsumer::Ptr pcgrd(pc); 00237 00238 Routing_Slip_Guard guard (this->internals_); 00239 00240 size_t request_id = delivery_requests_.size (); 00241 00242 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00243 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"), 00244 this->sequence_, 00245 static_cast<int> (request_id), 00246 static_cast<int> (this->complete_requests_), 00247 static_cast<int> (this->delivery_requests_.size ()) 00248 )); 00249 00250 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); 00251 this->delivery_requests_.push_back (request); 00252 TAO_Notify_Method_Request_Lookup_Queueable method (request, pc); 00253 00254 if (this->state_ == rssCREATING) 00255 { 00256 if (! reliable_channel) 00257 { 00258 enter_state_transient (guard); 00259 } 00260 else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0) 00261 { 00262 enter_state_transient (guard); 00263 } 00264 else if (! this->event_->reliable().is_valid()) 00265 { 00266 enter_state_new (guard); 00267 } 00268 else if (this->event_->reliable().value() == CosNotification::Persistent) 00269 { 00270 enter_state_new (guard); 00271 } 00272 else 00273 { 00274 enter_state_transient (guard); 00275 } 00276 } 00277 guard.release (); 00278 pc->execute_task (method ACE_ENV_ARG_PARAMETER); 00279 } |
|
Provide an identifying number for this Routing Slip to use in debug messages.
Definition at line 918 of file Routing_Slip.cpp. References sequence_.
00919 { 00920 return this->sequence_; 00921 } |
|
Definition at line 161 of file Routing_Slip.cpp. References rspm_, and TAO_Notify::Routing_Slip_Persistence_Manager::set_callback(). Referenced by create_persistence_manager().
|
|
Should delivery of this event be retried if it fails?
Definition at line 924 of file Routing_Slip.cpp. References rssTRANSIENT.
00925 { 00926 // simple minded test: if it's transient, don't retry it 00927 // @@todo Eventually this should check timeout, discard policy, etc. 00928 return this->state_ != rssTRANSIENT; 00929 } |
|
Marshal from CDR.
Definition at line 839 of file Routing_Slip.cpp. References ACE_CATCHANY, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_NEW_THROW_EX, ACE_TRY, ACE_TRY_CHECK, delivery_methods_, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_InputCDR::read_octet(), ACE_InputCDR::read_ulong(), ACE_Vector< T, DEFAULT_SIZE >::size(), this_ptr_, TAO_Notify_Method_Request_Lookup::unmarshal(), and TAO_Notify_Method_Request_Dispatch::unmarshal().
00840 { 00841 CORBA::ULong count = 0; 00842 cdr.read_ulong (count); 00843 for (size_t nreq = 0; nreq < count; ++nreq) 00844 { 00845 ACE_CDR::Octet code = 0; 00846 while (cdr.read_octet(code)) 00847 { 00848 ACE_DECLARE_NEW_ENV; 00849 ACE_TRY 00850 { 00851 if (code == TAO_Notify_Method_Request_Dispatch::persistence_code) 00852 { 00853 Delivery_Request * prequest; 00854 ACE_NEW_THROW_EX ( 00855 prequest, 00856 Delivery_Request(this_ptr_, this->delivery_requests_.size ()), 00857 CORBA::NO_MEMORY ()); 00858 ACE_TRY_CHECK; 00859 Delivery_Request_Ptr request(prequest); 00860 TAO_Notify_Method_Request_Dispatch_Queueable * method = 00861 TAO_Notify_Method_Request_Dispatch::unmarshal ( 00862 request, 00863 ecf, 00864 cdr 00865 ACE_ENV_ARG_PARAMETER); 00866 ACE_TRY_CHECK; 00867 if (method != 0) 00868 { 00869 this->delivery_requests_.push_back (request); 00870 this->delivery_methods_.push_back (method); 00871 } 00872 } 00873 else if (code == TAO_Notify_Method_Request_Lookup::persistence_code) 00874 { 00875 Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ())); 00876 TAO_Notify_Method_Request_Lookup_Queueable * method = 00877 TAO_Notify_Method_Request_Lookup::unmarshal ( 00878 request, 00879 ecf, 00880 cdr 00881 ACE_ENV_ARG_PARAMETER); 00882 ACE_TRY_CHECK 00883 if (method != 0) 00884 { 00885 this->delivery_requests_.push_back (request); 00886 this->delivery_methods_.push_back (method); 00887 } 00888 } 00889 } 00890 ACE_CATCHANY; 00891 { 00892 // @@todo should we log this? 00893 // just ignore failures 00894 } 00895 ACE_ENDTRY; 00896 } 00897 } 00898 return this->delivery_requests_.size () > 0; 00899 } |
|
Wait until the event/routing_slip has been saved at least once.
Definition at line 222 of file Routing_Slip.cpp. References is_safe_, Routing_Slip_Guard, and until_safe_.
00223 { 00224 Routing_Slip_Guard guard (this->internals_); 00225 while (!this->is_safe_) 00226 { 00227 this->until_safe_.wait (); 00228 } 00229 } |
|
How many delivery requests are complete.
Definition at line 208 of file Routing_Slip.h. Referenced by all_deliveries_complete(), and delivery_request_complete(). |
|
Definition at line 50 of file Routing_Slip.cpp. Referenced by continue_state_changed(), and create(). |
|
Definition at line 48 of file Routing_Slip.cpp. Referenced by create(). |
|
Definition at line 42 of file Routing_Slip.cpp. Referenced by continue_state_new(), and create(). |
|
Definition at line 39 of file Routing_Slip.cpp. Referenced by continue_state_transient(), and create(). |
|
Definition at line 49 of file Routing_Slip.cpp. Referenced by create(), and enter_state_changed(). |
|
Definition at line 47 of file Routing_Slip.cpp. Referenced by create(), and enter_state_changed_while_saving(). |
|
Definition at line 51 of file Routing_Slip.cpp. Referenced by create(), and enter_state_complete(). |
|
Definition at line 43 of file Routing_Slip.cpp. Referenced by create(), and enter_state_complete_while_new(). |
|
Definition at line 52 of file Routing_Slip.cpp. Referenced by create(), and enter_state_deleting(). |
|
Definition at line 41 of file Routing_Slip.cpp. Referenced by create(), and enter_state_new(). |
|
Definition at line 40 of file Routing_Slip.cpp. Referenced by create(), and enter_state_reloaded(). |
|
Definition at line 45 of file Routing_Slip.cpp. Referenced by create(), and enter_state_saved(). |
|
Definition at line 44 of file Routing_Slip.cpp. Referenced by create(), and enter_state_saving(). |
|
Definition at line 53 of file Routing_Slip.cpp. Referenced by create(), and enter_state_terminal(). |
|
Definition at line 38 of file Routing_Slip.cpp. Referenced by create(), and enter_state_transient(). |
|
Definition at line 46 of file Routing_Slip.cpp. Referenced by create(), and enter_state_updating(). |
|
Methods that should be restarted during event recovery.
Definition at line 205 of file Routing_Slip.h. Referenced by reconnect(), and unmarshal(). |
|
A collection of delivery requests.
Definition at line 202 of file Routing_Slip.h. Referenced by all_deliveries_complete(), delivery_request_complete(), dispatch(), marshal(), route(), and unmarshal(). |
|
Definition at line 180 of file Routing_Slip.h. |
|
Protection for internal information.
Definition at line 168 of file Routing_Slip.h. |
|
true when event persistence qos is guaranteed
Definition at line 170 of file Routing_Slip.h. Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist(). |
|
Referenced by add_to_persist_queue(), at_front_of_persist_queue(), enter_state_saving(), and persist_complete(). |
|
Definition at line 37 of file Routing_Slip.cpp. Referenced by Routing_Slip(). |
|
Pointer to a Routing_Slip_Persistence_Manager.
Definition at line 211 of file Routing_Slip.h. Referenced by create_persistence_manager(), enter_state_deleting(), enter_state_saving(), enter_state_updating(), and set_rspm(). |
|
Definition at line 213 of file Routing_Slip.h. Referenced by Routing_Slip(), and sequence(). |
|
Definition at line 36 of file Routing_Slip.cpp. Referenced by Routing_Slip(). |
|
A mini-state machine to control persistence See external doc for circles and arrows. |
|
Smart pointer to this object Provides continuity between smart pointers and "Routing_Slip::this" Also lets the Routing_Slip manage its own minimum lifetime. Definition at line 177 of file Routing_Slip.h. Referenced by enter_state_terminal(), and unmarshal(). |
|
signalled when is_safe_ goes true
Definition at line 172 of file Routing_Slip.h. Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist(). |