#include <Routing_Slip.h>
Inheritance diagram for TAO_Notify::Routing_Slip:
Interacts with persistent storage to provide reliable delivery.
Definition at line 66 of file Routing_Slip.h.
typedef ACE_Guard< TAO_SYNCH_MUTEX > TAO_Notify::Routing_Slip::Routing_Slip_Guard [private] |
Definition at line 68 of file Routing_Slip.h.
enum TAO_Notify::Routing_Slip::State [private] |
A mini-state machine to control persistence See external doc for circles and arrows.
rssCREATING | |
rssTRANSIENT | |
rssRELOADED | |
rssNEW | |
rssCOMPLETE_WHILE_NEW | |
rssSAVING | |
rssSAVED | |
rssUPDATING | |
rssCHANGED_WHILE_SAVING | |
rssCHANGED | |
rssCOMPLETE | |
rssDELETING | |
rssTERMINAL |
Definition at line 183 of file Routing_Slip.h.
00184 { 00185 rssCREATING, 00186 rssTRANSIENT, 00187 rssRELOADED, 00188 rssNEW, 00189 rssCOMPLETE_WHILE_NEW, 00190 rssSAVING, 00191 rssSAVED, 00192 rssUPDATING, 00193 rssCHANGED_WHILE_SAVING, 00194 rssCHANGED, 00195 rssCOMPLETE, 00196 rssDELETING, 00197 rssTERMINAL 00198 } state_;
TAO_Notify::Routing_Slip::~Routing_Slip | ( | ) | [virtual] |
destructor (should be private but that inspires compiler wars)
Definition at line 185 of file Routing_Slip.cpp.
References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, and LM_DEBUG.
00186 { 00187 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00188 ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"), 00189 this->sequence_ 00190 )); 00191 }
TAO_Notify::Routing_Slip::Routing_Slip | ( | const TAO_Notify_Event::Ptr & | event | ) | [private] |
Private constructor for use by create method.
Definition at line 167 of file Routing_Slip.cpp.
References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, routing_slip_sequence_, sequence_, and sequence_lock_.
00169 : is_safe_ (false) 00170 , until_safe_ (internals_) 00171 , this_ptr_ (0) 00172 , event_(event) 00173 , state_ (rssCREATING) 00174 , complete_requests_ (0) 00175 , rspm_ (0) 00176 { 00177 Routing_Slip_Guard guard (sequence_lock_); 00178 this->sequence_ = ++routing_slip_sequence_; 00179 if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG, 00180 ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"), 00181 this->sequence_ 00182 )); 00183 }
void TAO_Notify::Routing_Slip::add_to_persist_queue | ( | Routing_Slip_Guard & | guard | ) | [private] |
This routing_slip needs to be saved.
Definition at line 582 of file Routing_Slip.cpp.
References TAO_Notify::Routing_Slip_Queue::add(), persistent_queue_, and ACE_Guard< ACE_LOCK >::release().
Referenced by enter_state_changed(), and enter_state_new().
00583 { 00584 guard.release (); 00585 this->persistent_queue_.add (this->this_ptr_); 00586 }
bool TAO_Notify::Routing_Slip::all_deliveries_complete | ( | ) | const [private] |
Test to see if all deliveries are complete.
Definition at line 576 of file Routing_Slip.cpp.
References complete_requests_, delivery_requests_, and ACE_Vector< T, DEFAULT_SIZE >::size().
Referenced by continue_state_changed(), continue_state_new(), continue_state_transient(), enter_state_changed(), and enter_state_transient().
00577 { 00578 return this->complete_requests_ == this->delivery_requests_.size (); 00579 }
void TAO_Notify::Routing_Slip::at_front_of_persist_queue | ( | ) |
This Routing_Slip reached the front of the persistence queue.
Definition at line 453 of file Routing_Slip.cpp.
References ACE_DEBUG, ACE_ERROR, TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_deleting(), enter_state_saving(), enter_state_terminal(), enter_state_updating(), LM_DEBUG, LM_ERROR, persistent_queue_, ACE_Guard< ACE_LOCK >::release(), rssCHANGED, rssCOMPLETE, rssCOMPLETE_WHILE_NEW, rssNEW, and state_.
00454 { 00455 Routing_Slip_Guard guard (this->internals_); 00456 State state = this->state_; 00457 switch (state) 00458 { 00459 case rssNEW: 00460 { 00461 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00462 ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"), 00463 this->sequence_ 00464 )); 00465 enter_state_saving (guard); 00466 break; 00467 } 00468 case rssCOMPLETE_WHILE_NEW: 00469 { 00470 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00471 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"), 00472 this->sequence_ 00473 )); 00474 guard.release (); 00475 this->persistent_queue_.complete (); 00476 enter_state_terminal (guard); 00477 break; 00478 } 00479 case rssCHANGED: 00480 { 00481 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00482 ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"), 00483 this->sequence_ 00484 )); 00485 enter_state_updating (guard); 00486 break; 00487 } 00488 case rssCOMPLETE: 00489 { 00490 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00491 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"), 00492 this->sequence_ 00493 )); 00494 enter_state_deleting (guard); 00495 break; 00496 } 00497 default: 00498 { 00499 ACE_ERROR ((LM_ERROR, 00500 ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"), 00501 this->sequence_, 00502 static_cast<int> (this->state_) 00503 )); 00504 break; 00505 } 00506 } 00507 }
void TAO_Notify::Routing_Slip::continue_state_changed | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 783 of file Routing_Slip.cpp.
References all_deliveries_complete(), count_continue_changed_, enter_state_complete(), and ACE_Guard< ACE_LOCK >::release().
Referenced by delivery_request_complete().
00784 { 00785 ++count_continue_changed_; 00786 if (all_deliveries_complete ()) 00787 { 00788 enter_state_complete (guard); 00789 } 00790 else 00791 { 00792 guard.release (); 00793 } 00794 }
void TAO_Notify::Routing_Slip::continue_state_changed_while_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 758 of file Routing_Slip.cpp.
References ACE_Guard< ACE_LOCK >::release().
Referenced by delivery_request_complete().
void TAO_Notify::Routing_Slip::continue_state_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 604 of file Routing_Slip.cpp.
References all_deliveries_complete(), count_continue_new_, enter_state_complete_while_new(), and ACE_Guard< ACE_LOCK >::release().
Referenced by delivery_request_complete().
00605 { 00606 ++count_continue_new_; 00607 if (all_deliveries_complete ()) 00608 { 00609 this->enter_state_complete_while_new (guard); 00610 } 00611 guard.release (); 00612 }
void TAO_Notify::Routing_Slip::continue_state_transient | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 668 of file Routing_Slip.cpp.
References all_deliveries_complete(), count_continue_transient_, enter_state_terminal(), and ACE_Guard< ACE_LOCK >::release().
Referenced by delivery_request_complete().
00669 { 00670 ++count_continue_transient_; 00671 if (all_deliveries_complete ()) 00672 { 00673 enter_state_terminal (guard); 00674 } 00675 else 00676 { 00677 guard.release (); 00678 } 00679 }
Routing_Slip_Ptr TAO_Notify::Routing_Slip::create | ( | TAO_Notify_EventChannelFactory & | ecf, | |
Routing_Slip_Persistence_Manager * | rspm | |||
) | [static] |
"Factory" method for use during reload from persistent storage.
Definition at line 108 of file Routing_Slip.cpp.
References ACE_ERROR, ACE_TEXT(), create(), event(), TAO_Notify_Refcountable_Guard_T< T >::isSet(), LM_ERROR, TAO_Notify::Routing_Slip_Persistence_Manager::reload(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), and TAO_Notify_Event::unmarshal().
00111 { 00112 Routing_Slip_Ptr result; 00113 ACE_Message_Block * event_mb = 0; 00114 ACE_Message_Block * rs_mb = 0; 00115 try 00116 { 00117 if (rspm->reload (event_mb, rs_mb)) 00118 { 00119 TAO_InputCDR cdr_event (event_mb); 00120 TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event)); 00121 if (event.isSet()) 00122 { 00123 result = create (event); 00124 TAO_InputCDR cdr_rs (rs_mb); 00125 if ( result->unmarshal (ecf, cdr_rs)) 00126 { 00127 result->set_rspm (rspm); 00128 } 00129 else 00130 { 00131 ACE_ERROR ((LM_ERROR, 00132 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n") 00133 )); 00134 result.reset (); 00135 } 00136 } 00137 else 00138 { 00139 ACE_ERROR ((LM_ERROR, 00140 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n") 00141 )); 00142 } 00143 } 00144 } 00145 catch (const CORBA::Exception&) 00146 { 00147 ACE_ERROR ((LM_ERROR, 00148 ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n") 00149 )); 00150 } 00151 delete event_mb; 00152 delete rs_mb; 00153 00154 return result; 00155 }
Routing_Slip_Ptr TAO_Notify::Routing_Slip::create | ( | const TAO_Notify_Event::Ptr & | event | ) | [static] |
"Factory" method for normal use.
Definition at line 57 of file Routing_Slip.cpp.
References ACE_ERROR, ACE_NEW_THROW_EX, ACE_TEXT(), count_continue_changed_, count_continue_changed_while_saving_, count_continue_new_, count_continue_transient_, count_enter_changed_, count_enter_changed_while_saving_, count_enter_complete_, count_enter_complete_while_new_, count_enter_deleting_, count_enter_new_, count_enter_reloaded_, count_enter_saved_, count_enter_saving_, count_enter_terminal_, count_enter_transient_, count_enter_updating_, DEBUG_LEVEL, event(), and LM_ERROR.
Referenced by create(), TAO_Notify_EventChannelFactory::load_event_persistence(), and TAO_Notify_ProxyConsumer::push_i().
00058 { 00059 Routing_Slip * prs; 00060 ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ()); 00061 Routing_Slip_Ptr result(prs); 00062 result->this_ptr_ = result; // let the pointers touch so they use the same ref count 00063 00064 // note we don't care about ultra-precise stats, so no guard for these 00065 if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0)) 00066 { 00067 ACE_ERROR ((LM_ERROR, 00068 ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n") 00069 ACE_TEXT (" enter_transient \t%d\n") 00070 ACE_TEXT (" continue_transient \t%d\n") 00071 ACE_TEXT (" enter_reloaded \t%d\n") 00072 ACE_TEXT (" enter_new \t%d\n") 00073 ACE_TEXT (" continue_new \t%d\n") 00074 ACE_TEXT (" enter_complete_while_new \t%d\n") 00075 ACE_TEXT (" enter_saving \t%d\n") 00076 ACE_TEXT (" enter_saved \t%d\n") 00077 ACE_TEXT (" enter_updating \t%d\n") 00078 ACE_TEXT (" enter_changed_while_saving \t%d\n") 00079 ACE_TEXT (" continue_changed_while_saving\t%d\n") 00080 ACE_TEXT (" enter_changed \t%d\n") 00081 ACE_TEXT (" continue_changed \t%d\n") 00082 ACE_TEXT (" enter_complete \t%d\n") 00083 ACE_TEXT (" enter_deleting \t%d\n") 00084 ACE_TEXT (" enter_terminal \t%d\n") 00085 , static_cast<int> (count_enter_transient_) 00086 , static_cast<int> (count_continue_transient_) 00087 , static_cast<int> (count_enter_reloaded_) 00088 , static_cast<int> (count_enter_new_) 00089 , static_cast<int> (count_continue_new_) 00090 , static_cast<int> (count_enter_complete_while_new_) 00091 , static_cast<int> (count_enter_saving_) 00092 , static_cast<int> (count_enter_saved_) 00093 , static_cast<int> (count_enter_updating_) 00094 , static_cast<int> (count_enter_changed_while_saving_) 00095 , static_cast<int> (count_continue_changed_while_saving_) 00096 , static_cast<int> (count_enter_changed_) 00097 , static_cast<int> (count_continue_changed_) 00098 , static_cast<int> (count_enter_complete_) 00099 , static_cast<int> (count_enter_deleting_) 00100 , static_cast<int> (count_enter_terminal_) 00101 )); 00102 } 00103 return result; 00104 }
bool TAO_Notify::Routing_Slip::create_persistence_manager | ( | ) | [private] |
Definition at line 194 of file Routing_Slip.cpp.
References TAO_Notify::Event_Persistence_Factory::create_routing_slip_persistence_manager(), TAO_Notify::Event_Persistence_Strategy::get_factory(), rspm_, and set_rspm().
Referenced by enter_state_saving().
00195 { 00196 if (this->rspm_ == 0) 00197 { 00198 Event_Persistence_Strategy * strategy = 00199 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence"); 00200 if (strategy != 0) 00201 { 00202 Event_Persistence_Factory * factory = strategy->get_factory (); 00203 if (factory != 0) 00204 { 00205 set_rspm (factory->create_routing_slip_persistence_manager(this)); 00206 } 00207 } 00208 } 00209 return this->rspm_ != 0; 00210 }
void TAO_Notify::Routing_Slip::delivery_request_complete | ( | size_t | request_id | ) |
A delivery request has been satisfied.
Definition at line 388 of file Routing_Slip.cpp.
References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, complete_requests_, continue_state_changed(), continue_state_changed_while_saving(), continue_state_new(), continue_state_transient(), DEBUG_LEVEL, delivery_requests_, enter_state_changed(), enter_state_changed_while_saving(), LM_DEBUG, LM_ERROR, rssCHANGED, rssCHANGED_WHILE_SAVING, rssNEW, rssSAVED, rssSAVING, rssTRANSIENT, rssUPDATING, ACE_Vector< T, DEFAULT_SIZE >::size(), and state_.
00389 { 00390 Routing_Slip_Guard guard (this->internals_); 00391 ACE_ASSERT (request_id < this->delivery_requests_.size ()); 00392 // reset the pointer to allow the delivery_request to be deleted. 00393 this->delivery_requests_[request_id].reset (); 00394 this->complete_requests_ += 1; 00395 00396 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00397 ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"), 00398 this->sequence_, 00399 static_cast<int> (request_id), 00400 static_cast<int> (this->complete_requests_), 00401 static_cast<int> (this->delivery_requests_.size ()) 00402 )); 00403 State state = this->state_; 00404 switch (state) 00405 { 00406 case rssTRANSIENT: 00407 { 00408 continue_state_transient (guard); 00409 break; 00410 } 00411 case rssNEW: 00412 { 00413 continue_state_new (guard); 00414 break; 00415 } 00416 case rssSAVING: 00417 { 00418 enter_state_changed_while_saving (guard); 00419 break; 00420 } 00421 case rssUPDATING: 00422 { 00423 enter_state_changed_while_saving (guard); 00424 break; 00425 } 00426 case rssSAVED: 00427 { 00428 enter_state_changed (guard); 00429 break; 00430 } 00431 case rssCHANGED_WHILE_SAVING: 00432 { 00433 continue_state_changed_while_saving (guard); 00434 break; 00435 } 00436 case rssCHANGED: 00437 { 00438 continue_state_changed (guard); 00439 break; 00440 } 00441 default: 00442 { 00443 ACE_ERROR ((LM_ERROR, 00444 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"), 00445 static_cast<int> (this->state_) 00446 )); 00447 break; 00448 } 00449 } 00450 }
void TAO_Notify::Routing_Slip::dispatch | ( | TAO_Notify_ProxySupplier * | proxy_supplier, | |
bool | filter | |||
) |
Schedule delivery to a consumer via a proxy supplier.
proxy_supplier | the proxy supplier that will deliver the event | |
filter | should consumer-based filtering be applied? |
Definition at line 336 of file Routing_Slip.cpp.
References ACE_ASSERT, ACE_DEBUG, DEBUG_LEVEL, delivery_requests_, TAO_Notify_Object::execute_task(), TAO_Notify_Object::has_shutdown(), TAO_Notify_Object::id(), LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00339 { 00340 // cannot be the first action 00341 ACE_ASSERT (this->state_ != rssCREATING); 00342 00343 TAO_Notify_ProxySupplier::Ptr psgrd(ps); 00344 Routing_Slip_Guard guard (this->internals_); 00345 00346 size_t request_id = delivery_requests_.size (); 00347 00348 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00349 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"), 00350 this->sequence_, 00351 static_cast<int> (request_id), 00352 filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"), 00353 static_cast<int> (this->complete_requests_), 00354 static_cast<int> (this->delivery_requests_.size ()) 00355 )); 00356 00357 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); 00358 if (! ps->has_shutdown() ) 00359 { 00360 this->delivery_requests_.push_back (request); 00361 TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter); 00362 guard.release (); 00363 if (DEBUG_LEVEL > 8) 00364 ACE_DEBUG ((LM_DEBUG, 00365 "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to " 00366 "proxy supplier %d\n", 00367 this->sequence_, 00368 static_cast<int> (request_id), 00369 ps->id())); 00370 ps->execute_task (method); 00371 } 00372 else 00373 { 00374 if (DEBUG_LEVEL > 5) 00375 ACE_DEBUG ((LM_DEBUG, 00376 "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to " 00377 "proxy supplier %d; already shut down\n", 00378 this->sequence_, 00379 static_cast<int> (request_id), 00380 ps->id())); 00381 } 00382 }
void TAO_Notify::Routing_Slip::enter_state_changed | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 765 of file Routing_Slip.cpp.
References ACE_DEBUG, add_to_persist_queue(), all_deliveries_complete(), count_enter_changed_, DEBUG_LEVEL, enter_state_complete(), LM_DEBUG, rssCHANGED, and state_.
Referenced by delivery_request_complete(), and persist_complete().
00766 { 00767 ++count_enter_changed_; 00768 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00769 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"), 00770 this->sequence_ 00771 )); 00772 // complete state change BEFORE initiating request to avoid 00773 // race condition if request finishes before state is stable. 00774 this->state_ = rssCHANGED; 00775 if (all_deliveries_complete ()) 00776 { 00777 enter_state_complete (guard); 00778 } 00779 add_to_persist_queue (guard); 00780 }
void TAO_Notify::Routing_Slip::enter_state_changed_while_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 746 of file Routing_Slip.cpp.
References ACE_DEBUG, count_enter_changed_while_saving_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), rssCHANGED_WHILE_SAVING, and state_.
Referenced by delivery_request_complete().
00747 { 00748 ++count_enter_changed_while_saving_; 00749 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00750 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"), 00751 this->sequence_ 00752 )); 00753 this->state_ = rssCHANGED_WHILE_SAVING; 00754 guard.release (); 00755 }
void TAO_Notify::Routing_Slip::enter_state_complete | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 797 of file Routing_Slip.cpp.
References ACE_DEBUG, count_enter_complete_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), rssCOMPLETE, and state_.
Referenced by continue_state_changed(), and enter_state_changed().
00798 { 00799 ++count_enter_complete_; 00800 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00801 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"), 00802 this->sequence_ 00803 )); 00804 this->state_ = rssCOMPLETE; 00805 guard.release (); 00806 }
void TAO_Notify::Routing_Slip::enter_state_complete_while_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 614 of file Routing_Slip.cpp.
References ACE_DEBUG, count_enter_complete_while_new_, DEBUG_LEVEL, is_safe_, LM_DEBUG, rssCOMPLETE_WHILE_NEW, state_, and until_safe_.
Referenced by continue_state_new().
00615 { 00616 ++count_enter_complete_while_new_; 00617 ACE_UNUSED_ARG (guard); 00618 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00619 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"), 00620 this->sequence_ 00621 )); 00622 // allow the ConsumerProxy to return from the CORBA push call. 00623 if (! is_safe_) 00624 { 00625 is_safe_ = true; 00626 this->until_safe_.signal (); 00627 } 00628 this->state_ = rssCOMPLETE_WHILE_NEW; 00629 }
void TAO_Notify::Routing_Slip::enter_state_deleting | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 809 of file Routing_Slip.cpp.
References ACE_DEBUG, count_enter_deleting_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), TAO_Notify::Routing_Slip_Persistence_Manager::remove(), rspm_, rssDELETING, and state_.
Referenced by at_front_of_persist_queue().
00810 { 00811 ++count_enter_deleting_; 00812 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00813 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"), 00814 this->sequence_ 00815 )); 00816 this->state_ = rssDELETING; 00817 guard.release (); 00818 this->rspm_->remove (); 00819 }
void TAO_Notify::Routing_Slip::enter_state_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 592 of file Routing_Slip.cpp.
References ACE_DEBUG, add_to_persist_queue(), count_enter_new_, DEBUG_LEVEL, LM_DEBUG, rssNEW, and state_.
Referenced by route().
00593 { 00594 ++count_enter_new_; 00595 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00596 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"), 00597 this->sequence_ 00598 )); 00599 this->state_ = rssNEW; 00600 add_to_persist_queue(guard); 00601 }
void TAO_Notify::Routing_Slip::enter_state_reloaded | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 632 of file Routing_Slip.cpp.
References ACE_DEBUG, count_enter_reloaded_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), rssRELOADED, and state_.
00633 { 00634 ++count_enter_reloaded_; 00635 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00636 ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"), 00637 this->sequence_ 00638 )); 00639 this->state_ = rssRELOADED; 00640 guard.release(); 00641 }
void TAO_Notify::Routing_Slip::enter_state_saved | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 714 of file Routing_Slip.cpp.
References ACE_DEBUG, count_enter_saved_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), rssSAVED, and state_.
Referenced by persist_complete(), and reconnect().
00715 { 00716 ++count_enter_saved_; 00717 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00718 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"), 00719 this->sequence_ 00720 )); 00721 this->state_ = rssSAVED; 00722 guard.release (); 00723 }
void TAO_Notify::Routing_Slip::enter_state_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 681 of file Routing_Slip.cpp.
References ACE_DEBUG, ACE_OutputCDR::begin(), TAO_Notify::Routing_Slip_Queue::complete(), count_enter_saving_, create_persistence_manager(), DEBUG_LEVEL, enter_state_transient(), event_, LM_DEBUG, marshal(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), rspm_, rssSAVING, state_, and TAO_Notify::Routing_Slip_Persistence_Manager::store().
Referenced by at_front_of_persist_queue().
00682 { 00683 ++count_enter_saving_; 00684 if (!create_persistence_manager ()) 00685 { 00686 // Note This should actually be a throw (out of memory) 00687 // but we cheat and make this a transient event. 00688 guard.release (); 00689 this->persistent_queue_.complete (); 00690 enter_state_transient (guard); 00691 } 00692 else 00693 { 00694 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00695 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"), 00696 this->sequence_ 00697 )); 00698 this->state_ = rssSAVING; 00699 00700 TAO_OutputCDR event_cdr; 00701 this->event_->marshal (event_cdr); 00702 00703 const ACE_Message_Block *event_mb = event_cdr.begin (); 00704 TAO_OutputCDR rs_cdr; 00705 marshal (rs_cdr); 00706 const ACE_Message_Block *rs_mb = rs_cdr.begin (); 00707 00708 guard.release (); 00709 this->rspm_->store (*event_mb, *rs_mb); 00710 } 00711 }
void TAO_Notify::Routing_Slip::enter_state_terminal | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 822 of file Routing_Slip.cpp.
References ACE_ASSERT, ACE_DEBUG, count_enter_terminal_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), rssTERMINAL, state_, and this_ptr_.
Referenced by at_front_of_persist_queue(), continue_state_transient(), enter_state_transient(), and persist_complete().
00823 { 00824 ++count_enter_terminal_; 00825 ACE_ASSERT( this->is_safe_); 00826 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00827 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"), 00828 this->sequence_ 00829 )); 00830 this->state_ = rssTERMINAL; 00831 this->this_ptr_.reset (); 00832 guard.release (); 00833 }
void TAO_Notify::Routing_Slip::enter_state_transient | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 644 of file Routing_Slip.cpp.
References ACE_DEBUG, all_deliveries_complete(), count_enter_transient_, DEBUG_LEVEL, enter_state_terminal(), is_safe_, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), rssTRANSIENT, state_, and until_safe_.
Referenced by enter_state_saving(), and route().
00645 { 00646 ++count_enter_transient_; 00647 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00648 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"), 00649 this->sequence_ 00650 )); 00651 this->state_ = rssTRANSIENT; 00652 if (! is_safe_) 00653 { 00654 is_safe_ = true; 00655 this->until_safe_.signal (); 00656 } 00657 if (all_deliveries_complete ()) 00658 { 00659 enter_state_terminal (guard); 00660 } 00661 else 00662 { 00663 guard.release (); 00664 } 00665 }
void TAO_Notify::Routing_Slip::enter_state_updating | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 726 of file Routing_Slip.cpp.
References ACE_ASSERT, ACE_DEBUG, ACE_OutputCDR::begin(), count_enter_updating_, DEBUG_LEVEL, LM_DEBUG, marshal(), ACE_Guard< ACE_LOCK >::release(), rspm_, rssUPDATING, state_, and TAO_Notify::Routing_Slip_Persistence_Manager::update().
Referenced by at_front_of_persist_queue().
00727 { 00728 ++count_enter_updating_; 00729 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00730 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"), 00731 this->sequence_ 00732 )); 00733 this->state_ = rssUPDATING; 00734 00735 TAO_OutputCDR rs_cdr; 00736 marshal (rs_cdr); 00737 const ACE_Message_Block *rs_mb = rs_cdr.begin (); 00738 guard.release (); 00739 00740 ACE_ASSERT (this->rspm_ != 0); 00741 this->rspm_->update (*rs_mb); 00742 }
const TAO_Notify_Event::Ptr & TAO_Notify::Routing_Slip::event | ( | ) | const |
Definition at line 213 of file Routing_Slip.cpp.
References event_.
Referenced by create().
00214 { 00215 return this->event_; 00216 }
void TAO_Notify::Routing_Slip::marshal | ( | TAO_OutputCDR & | cdr | ) | [private] |
Marshal into a CDR.
Definition at line 836 of file Routing_Slip.cpp.
References delivery_requests_, ACE_Vector< T, DEFAULT_SIZE >::size(), and ACE_OutputCDR::write_ulong().
Referenced by enter_state_saving(), and enter_state_updating().
00837 { 00838 size_t request_count = this->delivery_requests_.size(); 00839 cdr.write_ulong ( 00840 ACE_Utils::truncate_cast<CORBA::ULong> (request_count - this->complete_requests_)); 00841 for (size_t nreq = 0; nreq < request_count; ++nreq) 00842 { 00843 Delivery_Request * request = this->delivery_requests_[nreq].get (); 00844 if (request != 0) 00845 { 00846 request->marshal (cdr); 00847 } 00848 } 00849 }
void TAO_Notify::Routing_Slip::persist_complete | ( | ) | [virtual] |
The persistent storage has completed the last request.
Implements TAO_Notify::Persistent_Callback.
Definition at line 510 of file Routing_Slip.cpp.
References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_changed(), enter_state_saved(), enter_state_terminal(), is_safe_, LM_DEBUG, LM_ERROR, ACE_Guard< ACE_LOCK >::locked(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), rssCHANGED_WHILE_SAVING, rssDELETING, rssSAVING, rssUPDATING, state_, and until_safe_.
00511 { 00512 // keep this object around til this method returns. 00513 Routing_Slip_Ptr me(this->this_ptr_); 00514 Routing_Slip_Guard guard (this->internals_); 00515 ACE_ASSERT (guard.locked ()); 00516 00517 // allow the ConsumerProxy to return from the CORBA push call. 00518 if (! is_safe_) 00519 { 00520 is_safe_ = true; 00521 this->until_safe_.signal (); 00522 } 00523 00524 State state = this->state_; 00525 switch (state) 00526 { 00527 case rssSAVING: 00528 { 00529 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00530 ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"), 00531 this->sequence_ 00532 )); 00533 enter_state_saved(guard); 00534 break; 00535 } 00536 case rssCHANGED_WHILE_SAVING: 00537 { 00538 enter_state_changed (guard); 00539 break; 00540 } 00541 case rssUPDATING: 00542 { 00543 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00544 ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"), 00545 this->sequence_ 00546 )); 00547 enter_state_saved (guard); 00548 break; 00549 } 00550 case rssDELETING: 00551 { 00552 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00553 ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"), 00554 this->sequence_ 00555 )); 00556 enter_state_terminal (guard); 00557 break; 00558 } 00559 default: 00560 { 00561 ACE_ERROR ((LM_ERROR, 00562 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"), 00563 static_cast<int> (this->state_) 00564 )); 00565 guard.release (); 00566 break; 00567 } 00568 } 00569 this->persistent_queue_.complete (); 00570 }
void TAO_Notify::Routing_Slip::reconnect | ( | void | ) |
Definition at line 908 of file Routing_Slip.cpp.
References ACE_Vector< T, DEFAULT_SIZE >::clear(), delivery_methods_, enter_state_saved(), and ACE_Vector< T, DEFAULT_SIZE >::size().
00909 { 00910 Routing_Slip_Guard guard (this->internals_); 00911 enter_state_saved (guard); 00912 00913 //@@todo is there a worker_task available to do this? 00914 size_t count = this->delivery_methods_.size (); 00915 for (size_t nmethod = 0; nmethod < count; ++nmethod) 00916 { 00917 this->delivery_methods_[nmethod]->execute (); 00918 } 00919 this->delivery_methods_.clear (); 00920 }
void TAO_Notify::Routing_Slip::route | ( | TAO_Notify_ProxyConsumer * | pc, | |
bool | reliable_channel | |||
) |
Route this event to destinations must be the Action request after the routing slip is created.
Definition at line 229 of file Routing_Slip.cpp.
References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, delivery_requests_, enter_state_new(), enter_state_transient(), TAO_Notify_Object::execute_task(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00230 { 00231 ACE_ASSERT(pc != 0); 00232 00233 TAO_Notify_ProxyConsumer::Ptr pcgrd(pc); 00234 00235 Routing_Slip_Guard guard (this->internals_); 00236 00237 size_t request_id = delivery_requests_.size (); 00238 00239 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, 00240 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"), 00241 this->sequence_, 00242 static_cast<int> (request_id), 00243 static_cast<int> (this->complete_requests_), 00244 static_cast<int> (this->delivery_requests_.size ()) 00245 )); 00246 00247 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); 00248 this->delivery_requests_.push_back (request); 00249 TAO_Notify_Method_Request_Lookup_Queueable method (request, pc); 00250 00251 if (this->state_ == rssCREATING) 00252 { 00253 if (! reliable_channel) 00254 { 00255 enter_state_transient (guard); 00256 } 00257 else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0) 00258 { 00259 enter_state_transient (guard); 00260 } 00261 else if (! this->event_->reliable().is_valid()) 00262 { 00263 enter_state_new (guard); 00264 } 00265 else if (this->event_->reliable().value() == true) 00266 { 00267 enter_state_new (guard); 00268 } 00269 else 00270 { 00271 enter_state_transient (guard); 00272 } 00273 } 00274 else 00275 { 00276 // We only need to release the guard if the state is rssCREATING. 00277 // By calling enter_state_*, we are guaranteed that the guard has 00278 // been released. 00279 guard.release (); 00280 } 00281 pc->execute_task (method); 00282 }
int TAO_Notify::Routing_Slip::sequence | ( | ) | const |
Provide an identifying number for this Routing Slip to use in debug messages.
Definition at line 923 of file Routing_Slip.cpp.
References sequence_.
00924 { 00925 return this->sequence_; 00926 }
void TAO_Notify::Routing_Slip::set_rspm | ( | Routing_Slip_Persistence_Manager * | rspm | ) |
Definition at line 158 of file Routing_Slip.cpp.
References rspm_, and TAO_Notify::Routing_Slip_Persistence_Manager::set_callback().
Referenced by create_persistence_manager().
00159 { 00160 this->rspm_ = rspm; 00161 if (rspm_ != 0) 00162 { 00163 rspm->set_callback (this); 00164 } 00165 }
bool TAO_Notify::Routing_Slip::should_retry | ( | ) | const |
Should delivery of this event be retried if it fails?
Definition at line 929 of file Routing_Slip.cpp.
References rssTRANSIENT, and state_.
00930 { 00931 // simple minded test: if it's transient, don't retry it 00932 // @@todo Eventually this should check timeout, discard policy, etc. 00933 return this->state_ != rssTRANSIENT; 00934 }
bool TAO_Notify::Routing_Slip::unmarshal | ( | TAO_Notify_EventChannelFactory & | ecf, | |
TAO_InputCDR & | rscdr | |||
) | [private] |
Marshal from CDR.
Definition at line 852 of file Routing_Slip.cpp.
References ACE_NEW_THROW_EX, delivery_methods_, delivery_requests_, TAO_Notify_Method_Request_Lookup::persistence_code, TAO_Notify_Method_Request_Dispatch::persistence_code, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_InputCDR::read_octet(), ACE_InputCDR::read_ulong(), ACE_Vector< T, DEFAULT_SIZE >::size(), this_ptr_, TAO_Notify_Method_Request_Lookup::unmarshal(), and TAO_Notify_Method_Request_Dispatch::unmarshal().
00853 { 00854 CORBA::ULong count = 0; 00855 cdr.read_ulong (count); 00856 for (size_t nreq = 0; nreq < count; ++nreq) 00857 { 00858 ACE_CDR::Octet code = 0; 00859 while (cdr.read_octet(code)) 00860 { 00861 try 00862 { 00863 if (code == TAO_Notify_Method_Request_Dispatch::persistence_code) 00864 { 00865 Delivery_Request * prequest; 00866 ACE_NEW_THROW_EX ( 00867 prequest, 00868 Delivery_Request(this_ptr_, this->delivery_requests_.size ()), 00869 CORBA::NO_MEMORY ()); 00870 Delivery_Request_Ptr request(prequest); 00871 TAO_Notify_Method_Request_Dispatch_Queueable * method = 00872 TAO_Notify_Method_Request_Dispatch::unmarshal ( 00873 request, 00874 ecf, 00875 cdr); 00876 if (method != 0) 00877 { 00878 this->delivery_requests_.push_back (request); 00879 this->delivery_methods_.push_back (method); 00880 } 00881 } 00882 else if (code == TAO_Notify_Method_Request_Lookup::persistence_code) 00883 { 00884 Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ())); 00885 TAO_Notify_Method_Request_Lookup_Queueable * method = 00886 TAO_Notify_Method_Request_Lookup::unmarshal ( 00887 request, 00888 ecf, 00889 cdr); 00890 if (method != 0) 00891 { 00892 this->delivery_requests_.push_back (request); 00893 this->delivery_methods_.push_back (method); 00894 } 00895 } 00896 } 00897 catch (const CORBA::Exception&) 00898 { 00899 // @@todo should we log this? 00900 // just ignore failures 00901 } 00902 } 00903 } 00904 return this->delivery_requests_.size () > 0; 00905 }
void TAO_Notify::Routing_Slip::wait_persist | ( | ) |
Wait until the event/routing_slip has been saved at least once.
Definition at line 219 of file Routing_Slip.cpp.
References until_safe_.
00220 { 00221 Routing_Slip_Guard guard (this->internals_); 00222 while (!this->is_safe_) 00223 { 00224 this->until_safe_.wait (); 00225 } 00226 }
size_t TAO_Notify::Routing_Slip::complete_requests_ [private] |
How many delivery requests are complete.
Definition at line 207 of file Routing_Slip.h.
Referenced by all_deliveries_complete(), and delivery_request_complete().
size_t TAO_Notify::Routing_Slip::count_continue_changed_ = 0 [static, private] |
Definition at line 228 of file Routing_Slip.h.
Referenced by continue_state_changed(), and create().
size_t TAO_Notify::Routing_Slip::count_continue_changed_while_saving_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_continue_new_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_continue_transient_ = 0 [static, private] |
Definition at line 217 of file Routing_Slip.h.
Referenced by continue_state_transient(), and create().
size_t TAO_Notify::Routing_Slip::count_enter_changed_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_changed_while_saving_ = 0 [static, private] |
Definition at line 225 of file Routing_Slip.h.
Referenced by create(), and enter_state_changed_while_saving().
size_t TAO_Notify::Routing_Slip::count_enter_complete_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_complete_while_new_ = 0 [static, private] |
Definition at line 221 of file Routing_Slip.h.
Referenced by create(), and enter_state_complete_while_new().
size_t TAO_Notify::Routing_Slip::count_enter_deleting_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_new_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_reloaded_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_saved_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_saving_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_terminal_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_transient_ = 0 [static, private] |
size_t TAO_Notify::Routing_Slip::count_enter_updating_ = 0 [static, private] |
Methods that should be restarted during event recovery.
Definition at line 204 of file Routing_Slip.h.
Referenced by reconnect(), and unmarshal().
A collection of delivery requests.
Definition at line 201 of file Routing_Slip.h.
Referenced by all_deliveries_complete(), delivery_request_complete(), dispatch(), marshal(), route(), and unmarshal().
bool TAO_Notify::Routing_Slip::is_safe_ [private] |
true when event persistence qos is guaranteed
Definition at line 169 of file Routing_Slip.h.
Referenced by enter_state_complete_while_new(), enter_state_transient(), and persist_complete().
Routing_Slip_Queue TAO_Notify::Routing_Slip::persistent_queue_ [static, private] |
Definition at line 233 of file Routing_Slip.h.
Referenced by add_to_persist_queue(), at_front_of_persist_queue(), enter_state_saving(), and persist_complete().
int TAO_Notify::Routing_Slip::routing_slip_sequence_ = 0 [static, private] |
Pointer to a Routing_Slip_Persistence_Manager.
Definition at line 210 of file Routing_Slip.h.
Referenced by create_persistence_manager(), enter_state_deleting(), enter_state_saving(), enter_state_updating(), and set_rspm().
int TAO_Notify::Routing_Slip::sequence_ [private] |
TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::sequence_lock_ [static, private] |
enum TAO_Notify::Routing_Slip::State TAO_Notify::Routing_Slip::state_ [private] |
A mini-state machine to control persistence See external doc for circles and arrows.
Referenced by at_front_of_persist_queue(), delivery_request_complete(), enter_state_changed(), enter_state_changed_while_saving(), enter_state_complete(), enter_state_complete_while_new(), enter_state_deleting(), enter_state_new(), enter_state_reloaded(), enter_state_saved(), enter_state_saving(), enter_state_terminal(), enter_state_transient(), enter_state_updating(), persist_complete(), and should_retry().
Smart pointer to this object Provides continuity between smart pointers and "Routing_Slip::this" Also lets the Routing_Slip manage its own minimum lifetime.
Definition at line 176 of file Routing_Slip.h.
Referenced by enter_state_terminal(), and unmarshal().
signalled when is_safe_ goes true
Definition at line 171 of file Routing_Slip.h.
Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist().