#include <Routing_Slip.h>
Inheritance diagram for TAO_Notify::Routing_Slip:


Interacts with persistent storage to provide reliable delivery.
Definition at line 66 of file Routing_Slip.h.
|
|
|
A mini-state machine to control persistence See external doc for circles and arrows.
Definition at line 184 of file Routing_Slip.h.
00185 {
00186 rssCREATING,
00187 rssTRANSIENT,
00188 rssRELOADED,
00189 rssNEW,
00190 rssCOMPLETE_WHILE_NEW,
00191 rssSAVING,
00192 rssSAVED,
00193 rssUPDATING,
00194 rssCHANGED_WHILE_SAVING,
00195 rssCHANGED,
00196 rssCOMPLETE,
00197 rssDELETING,
00198 rssTERMINAL
00199 } state_;
|
|
|
destructor (should be private but that inspires compiler wars)
Definition at line 188 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, and LM_DEBUG.
00189 {
00190 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00191 ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"),
00192 this->sequence_
00193 ));
00194 }
|
|
|
Private constructor for use by create method.
Definition at line 170 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, routing_slip_sequence_, sequence_, and sequence_lock_.
00172 : is_safe_ (false) 00173 , until_safe_ (internals_) 00174 , this_ptr_ (0) 00175 , event_(event) 00176 , state_ (rssCREATING) 00177 , complete_requests_ (0) 00178 , rspm_ (0) 00179 { 00180 Routing_Slip_Guard guard (sequence_lock_); 00181 this->sequence_ = ++routing_slip_sequence_; 00182 if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG, 00183 ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"), 00184 this->sequence_ 00185 )); 00186 } |
|
|
This routing_slip needs to be saved.
Definition at line 578 of file Routing_Slip.cpp. References ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify::Routing_Slip_Queue::add(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), and Routing_Slip_Guard. Referenced by enter_state_changed(), and enter_state_new().
00579 {
00580 guard.release ();
00581 this->persistent_queue_.add (this->this_ptr_);
00582 guard.acquire (); // necessary?
00583 }
|
|
|
Test to see if all deliveries are complete.
Definition at line 572 of file Routing_Slip.cpp. References complete_requests_, delivery_requests_, and ACE_Vector< T, DEFAULT_SIZE >::size(). Referenced by continue_state_changed(), continue_state_new(), continue_state_transient(), enter_state_changed(), and enter_state_transient().
00573 {
00574 return this->complete_requests_ == this->delivery_requests_.size ();
00575 }
|
|
|
This Routing_Slip reached the front of the persistence queue.
Definition at line 451 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_deleting(), enter_state_saving(), enter_state_terminal(), enter_state_updating(), LM_DEBUG, LM_ERROR, persistent_queue_, Routing_Slip_Guard, rssCHANGED, rssCOMPLETE, rssCOMPLETE_WHILE_NEW, and rssNEW.
00452 {
00453 Routing_Slip_Guard guard (this->internals_);
00454 State state = this->state_;
00455 switch (state)
00456 {
00457 case rssNEW:
00458 {
00459 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00460 ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"),
00461 this->sequence_
00462 ));
00463 enter_state_saving (guard);
00464 break;
00465 }
00466 case rssCOMPLETE_WHILE_NEW:
00467 {
00468 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00469 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"),
00470 this->sequence_
00471 ));
00472 this->persistent_queue_.complete ();
00473 enter_state_terminal (guard);
00474 break;
00475 }
00476 case rssCHANGED:
00477 {
00478 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00479 ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"),
00480 this->sequence_
00481 ));
00482 enter_state_updating (guard);
00483 break;
00484 }
00485 case rssCOMPLETE:
00486 {
00487 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00488 ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"),
00489 this->sequence_
00490 ));
00491 enter_state_deleting (guard);
00492 break;
00493 }
00494 default:
00495 {
00496 ACE_ERROR ((LM_ERROR,
00497 ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"),
00498 this->sequence_,
00499 static_cast<int> (this->state_)
00500 ));
00501 break;
00502 }
00503 }
00504 }
|
|
|
Definition at line 774 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_changed_, enter_state_complete(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00775 {
00776 ++count_continue_changed_;
00777 if (all_deliveries_complete ())
00778 {
00779 enter_state_complete (guard);
00780 }
00781 }
|
|
|
Definition at line 749 of file Routing_Slip.cpp. References Routing_Slip_Guard. Referenced by delivery_request_complete().
00750 {
00751 ACE_UNUSED_ARG (guard);
00752 // no action necessary
00753 }
|
|
|
Definition at line 601 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_new_, enter_state_complete_while_new(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00602 {
00603 ++count_continue_new_;
00604 if (all_deliveries_complete ())
00605 {
00606 this->enter_state_complete_while_new (guard);
00607 }
00608 }
|
|
|
Definition at line 661 of file Routing_Slip.cpp. References all_deliveries_complete(), count_continue_transient_, enter_state_terminal(), and Routing_Slip_Guard. Referenced by delivery_request_complete().
00662 {
00663 ++count_continue_transient_;
00664 if (all_deliveries_complete ())
00665 {
00666 enter_state_terminal (guard);
00667 }
00668 }
|
|
||||||||||||
|
"Factory" method for use during reload from persistent storage.
Definition at line 108 of file Routing_Slip.cpp. References ACE_CATCHANY, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, create(), event(), LM_ERROR, TAO_Notify::Routing_Slip_Persistence_Manager::reload(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), and TAO_Notify::Routing_Slip_Ptr.
00111 {
00112 Routing_Slip_Ptr result;
00113 ACE_Message_Block * event_mb = 0;
00114 ACE_Message_Block * rs_mb = 0;
00115 ACE_DECLARE_NEW_ENV;
00116 ACE_TRY
00117 {
00118 if (rspm->reload (event_mb, rs_mb))
00119 {
00120 TAO_InputCDR cdr_event (event_mb);
00121 TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event));
00122 if (event.isSet())
00123 {
00124 result = create (event ACE_ENV_ARG_PARAMETER);
00125 ACE_TRY_CHECK;
00126 TAO_InputCDR cdr_rs (rs_mb);
00127 if ( result->unmarshal (ecf, cdr_rs))
00128 {
00129 result->set_rspm (rspm);
00130 }
00131 else
00132 {
00133 ACE_ERROR ((LM_ERROR,
00134 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n")
00135 ));
00136 result.reset ();
00137 }
00138 }
00139 else
00140 {
00141 ACE_ERROR ((LM_ERROR,
00142 ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n")
00143 ));
00144 }
00145 }
00146 }
00147 ACE_CATCHANY
00148 {
00149 ACE_ERROR ((LM_ERROR,
00150 ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n")
00151 ));
00152 }
00153 ACE_ENDTRY;
00154 delete event_mb;
00155 delete rs_mb;
00156
00157 return result;
00158 }
|
|
|
"Factory" method for normal use.
Definition at line 56 of file Routing_Slip.cpp. References ACE_CHECK_RETURN, ACE_ERROR, ACE_NEW_THROW_EX, ACE_TEXT(), count_continue_changed_, count_continue_changed_while_saving_, count_continue_new_, count_continue_transient_, count_enter_changed_, count_enter_changed_while_saving_, count_enter_complete_, count_enter_complete_while_new_, count_enter_deleting_, count_enter_new_, count_enter_reloaded_, count_enter_saved_, count_enter_saving_, count_enter_terminal_, count_enter_transient_, count_enter_updating_, DEBUG_LEVEL, LM_ERROR, and TAO_Notify::Routing_Slip_Ptr. Referenced by create(), TAO_Notify_EventChannelFactory::load_event_persistence(), and TAO_Notify_ProxyConsumer::push_i().
00057 {
00058 Routing_Slip * prs;
00059 ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ());
00060 ACE_CHECK_RETURN (Routing_Slip_Ptr());
00061 Routing_Slip_Ptr result(prs);
00062 result->this_ptr_ = result; // let the pointers touch so they use the same ref count
00063
00064 // note we don't care about ultra-precise stats, so no guard for these
00065 if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0))
00066 {
00067 ACE_ERROR ((LM_ERROR,
00068 ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n")
00069 ACE_TEXT (" enter_transient \t%d\n")
00070 ACE_TEXT (" continue_transient \t%d\n")
00071 ACE_TEXT (" enter_reloaded \t%d\n")
00072 ACE_TEXT (" enter_new \t%d\n")
00073 ACE_TEXT (" continue_new \t%d\n")
00074 ACE_TEXT (" enter_complete_while_new \t%d\n")
00075 ACE_TEXT (" enter_saving \t%d\n")
00076 ACE_TEXT (" enter_saved \t%d\n")
00077 ACE_TEXT (" enter_updating \t%d\n")
00078 ACE_TEXT (" enter_changed_while_saving \t%d\n")
00079 ACE_TEXT (" continue_changed_while_saving\t%d\n")
00080 ACE_TEXT (" enter_changed \t%d\n")
00081 ACE_TEXT (" continue_changed \t%d\n")
00082 ACE_TEXT (" enter_complete \t%d\n")
00083 ACE_TEXT (" enter_deleting \t%d\n")
00084 ACE_TEXT (" enter_terminal \t%d\n")
00085 , static_cast<int> (count_enter_transient_)
00086 , static_cast<int> (count_continue_transient_)
00087 , static_cast<int> (count_enter_reloaded_)
00088 , static_cast<int> (count_enter_new_)
00089 , static_cast<int> (count_continue_new_)
00090 , static_cast<int> (count_enter_complete_while_new_)
00091 , static_cast<int> (count_enter_saving_)
00092 , static_cast<int> (count_enter_saved_)
00093 , static_cast<int> (count_enter_updating_)
00094 , static_cast<int> (count_enter_changed_while_saving_)
00095 , static_cast<int> (count_continue_changed_while_saving_)
00096 , static_cast<int> (count_enter_changed_)
00097 , static_cast<int> (count_continue_changed_)
00098 , static_cast<int> (count_enter_complete_)
00099 , static_cast<int> (count_enter_deleting_)
00100 , static_cast<int> (count_enter_terminal_)
00101 ));
00102 }
00103 return result;
00104 }
|
|
|
Definition at line 197 of file Routing_Slip.cpp. References TAO_Notify::Event_Persistence_Factory::create_routing_slip_persistence_manager(), TAO_Notify::Event_Persistence_Strategy::get_factory(), rspm_, and set_rspm(). Referenced by enter_state_saving().
00198 {
00199 if (this->rspm_ == 0)
00200 {
00201 Event_Persistence_Strategy * strategy =
00202 ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00203 if (strategy != 0)
00204 {
00205 Event_Persistence_Factory * factory = strategy->get_factory ();
00206 if (factory != 0)
00207 {
00208 set_rspm (factory->create_routing_slip_persistence_manager(this));
00209 }
00210 }
00211 }
00212 return this->rspm_ != 0;
00213 }
|
|
|
A delivery request has been satisfied.
Definition at line 386 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), complete_requests_, continue_state_changed(), continue_state_changed_while_saving(), continue_state_new(), continue_state_transient(), DEBUG_LEVEL, delivery_requests_, enter_state_changed(), enter_state_changed_while_saving(), LM_DEBUG, LM_ERROR, Routing_Slip_Guard, rssCHANGED, rssCHANGED_WHILE_SAVING, rssNEW, rssSAVED, rssSAVING, rssTRANSIENT, rssUPDATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00387 {
00388 Routing_Slip_Guard guard (this->internals_);
00389 ACE_ASSERT (request_id < this->delivery_requests_.size ());
00390 // reset the pointer to allow the delivery_request to be deleted.
00391 this->delivery_requests_[request_id].reset ();
00392 this->complete_requests_ += 1;
00393
00394 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00395 ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"),
00396 this->sequence_,
00397 static_cast<int> (request_id),
00398 static_cast<int> (this->complete_requests_),
00399 static_cast<int> (this->delivery_requests_.size ())
00400 ));
00401 State state = this->state_;
00402 switch (state)
00403 {
00404 case rssTRANSIENT:
00405 {
00406 continue_state_transient (guard);
00407 break;
00408 }
00409 case rssNEW:
00410 {
00411 continue_state_new (guard);
00412 break;
00413 }
00414 case rssSAVING:
00415 {
00416 enter_state_changed_while_saving (guard);
00417 break;
00418 }
00419 case rssUPDATING:
00420 {
00421 enter_state_changed_while_saving (guard);
00422 break;
00423 }
00424 case rssSAVED:
00425 {
00426 enter_state_changed (guard);
00427 break;
00428 }
00429 case rssCHANGED_WHILE_SAVING:
00430 {
00431 continue_state_changed_while_saving (guard);
00432 break;
00433 }
00434 case rssCHANGED:
00435 {
00436 continue_state_changed (guard);
00437 break;
00438 }
00439 default:
00440 {
00441 ACE_ERROR ((LM_ERROR,
00442 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"),
00443 static_cast<int> (this->state_)
00444 ));
00445 break;
00446 }
00447 }
00448 }
|
|
||||||||||||
|
Schedule delivery to a consumer via a proxy supplier.
Definition at line 332 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_CHECK, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_TEXT(), DEBUG_LEVEL, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, TAO_Notify_Object::execute_task(), TAO_Notify_Object::has_shutdown(), TAO_Notify_Object::id(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00336 {
00337 // cannot be the first action
00338 ACE_ASSERT (this->state_ != rssCREATING);
00339
00340 TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00341 Routing_Slip_Guard guard (this->internals_);
00342
00343 size_t request_id = delivery_requests_.size ();
00344
00345 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00346 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"),
00347 this->sequence_,
00348 static_cast<int> (request_id),
00349 filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00350 static_cast<int> (this->complete_requests_),
00351 static_cast<int> (this->delivery_requests_.size ())
00352 ));
00353
00354 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00355 if (! ps->has_shutdown() )
00356 {
00357 this->delivery_requests_.push_back (request);
00358 TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00359 guard.release ();
00360 if (DEBUG_LEVEL > 8)
00361 ACE_DEBUG ((LM_DEBUG,
00362 "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00363 "proxy supplier %d\n",
00364 this->sequence_,
00365 static_cast<int> (request_id),
00366 ps->id()));
00367 ps->execute_task (method ACE_ENV_ARG_PARAMETER);
00368 ACE_CHECK;
00369 }
00370 else
00371 {
00372 if (DEBUG_LEVEL > 5)
00373 ACE_DEBUG ((LM_DEBUG,
00374 "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00375 "proxy supplier %d; already shut down\n",
00376 this->sequence_,
00377 static_cast<int> (request_id),
00378 ps->id()));
00379 }
00380 }
|
|
|
Definition at line 756 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), add_to_persist_queue(), all_deliveries_complete(), count_enter_changed_, DEBUG_LEVEL, enter_state_complete(), LM_DEBUG, Routing_Slip_Guard, and rssCHANGED. Referenced by delivery_request_complete(), and persist_complete().
00757 {
00758 ++count_enter_changed_;
00759 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00760 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"),
00761 this->sequence_
00762 ));
00763 // complete state change BEFORE initiating request to avoid
00764 // race condition if request finishes before state is stable.
00765 this->state_ = rssCHANGED;
00766 if (all_deliveries_complete ())
00767 {
00768 enter_state_complete (guard);
00769 }
00770 add_to_persist_queue (guard);
00771 }
|
|
|
Definition at line 737 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_changed_while_saving_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssCHANGED_WHILE_SAVING. Referenced by delivery_request_complete().
00738 {
00739 ++count_enter_changed_while_saving_;
00740 ACE_UNUSED_ARG (guard);
00741 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00742 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"),
00743 this->sequence_
00744 ));
00745 this->state_ = rssCHANGED_WHILE_SAVING;
00746 }
|
|
|
Definition at line 784 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_complete_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssCOMPLETE. Referenced by continue_state_changed(), and enter_state_changed().
00785 {
00786 ++count_enter_complete_;
00787 ACE_UNUSED_ARG (guard);
00788 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00789 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"),
00790 this->sequence_
00791 ));
00792 this->state_ = rssCOMPLETE;
00793 }
|
|
|
Definition at line 610 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_complete_while_new_, DEBUG_LEVEL, is_safe_, LM_DEBUG, Routing_Slip_Guard, rssCOMPLETE_WHILE_NEW, and until_safe_. Referenced by continue_state_new().
00611 {
00612 ++count_enter_complete_while_new_;
00613 ACE_UNUSED_ARG (guard);
00614 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00615 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"),
00616 this->sequence_
00617 ));
00618 // allow the ConsumerProxy to return from the CORBA push call.
00619 if (! is_safe_)
00620 {
00621 is_safe_ = true;
00622 this->until_safe_.signal ();
00623 }
00624 this->state_ = rssCOMPLETE_WHILE_NEW;
00625 }
|
|
|
Definition at line 796 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), count_enter_deleting_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), TAO_Notify::Routing_Slip_Persistence_Manager::remove(), Routing_Slip_Guard, rspm_, and rssDELETING. Referenced by at_front_of_persist_queue().
00797 {
00798 ++count_enter_deleting_;
00799 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00800 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"),
00801 this->sequence_
00802 ));
00803 this->state_ = rssDELETING;
00804 guard.release ();
00805 this->rspm_->remove ();
00806 guard.acquire (); // necessary?
00807 }
|
|
|
Definition at line 589 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), add_to_persist_queue(), count_enter_new_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssNEW. Referenced by route().
00590 {
00591 ++count_enter_new_;
00592 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00593 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"),
00594 this->sequence_
00595 ));
00596 this->state_ = rssNEW;
00597 add_to_persist_queue(guard);
00598 }
|
|
|
Definition at line 628 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_reloaded_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssRELOADED.
00629 {
00630 ++count_enter_reloaded_;
00631 ACE_UNUSED_ARG (guard);
00632 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00633 ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"),
00634 this->sequence_
00635 ));
00636 this->state_ = rssRELOADED;
00637 }
|
|
|
Definition at line 704 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), count_enter_saved_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssSAVED. Referenced by persist_complete(), and reconnect().
00705 {
00706 ++count_enter_saved_;
00707 ACE_UNUSED_ARG (guard);
00708 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00709 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"),
00710 this->sequence_
00711 ));
00712 this->state_ = rssSAVED;
00713 }
|
|
|
Definition at line 670 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), ACE_OutputCDR::begin(), TAO_Notify::Routing_Slip_Queue::complete(), count_enter_saving_, create_persistence_manager(), DEBUG_LEVEL, enter_state_transient(), LM_DEBUG, marshal(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rspm_, rssSAVING, and TAO_Notify::Routing_Slip_Persistence_Manager::store(). Referenced by at_front_of_persist_queue().
00671 {
00672 ++count_enter_saving_;
00673 if (!create_persistence_manager ())
00674 {
00675 // Note This should actually be a throw (out of memory)
00676 // but we cheat and make this a transient event.
00677 this->persistent_queue_.complete ();
00678 enter_state_transient (guard);
00679 }
00680 else
00681 {
00682 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00683 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"),
00684 this->sequence_
00685 ));
00686 this->state_ = rssSAVING;
00687
00688 TAO_OutputCDR event_cdr;
00689 this->event_->marshal (event_cdr);
00690
00691 const ACE_Message_Block *event_mb = event_cdr.begin ();
00692 TAO_OutputCDR rs_cdr;
00693 marshal (rs_cdr);
00694 const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00695
00696 guard.release ();
00697 this->rspm_->store (*event_mb, *rs_mb);
00698
00699 guard.acquire (); // necessary?
00700 }
00701 }
|
|
|
Definition at line 810 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), count_enter_terminal_, DEBUG_LEVEL, LM_DEBUG, ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), Routing_Slip_Guard, rssTERMINAL, and this_ptr_. Referenced by at_front_of_persist_queue(), continue_state_transient(), enter_state_transient(), and persist_complete().
00811 {
00812 ++count_enter_terminal_;
00813 ACE_UNUSED_ARG (guard);
00814 ACE_ASSERT( this->is_safe_);
00815 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00816 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"),
00817 this->sequence_
00818 ));
00819 this->state_ = rssTERMINAL;
00820 this->this_ptr_.reset ();
00821 }
|
|
|
Definition at line 640 of file Routing_Slip.cpp. References ACE_DEBUG, ACE_TEXT(), all_deliveries_complete(), count_enter_transient_, DEBUG_LEVEL, enter_state_terminal(), is_safe_, LM_DEBUG, Routing_Slip_Guard, rssTRANSIENT, and until_safe_. Referenced by enter_state_saving(), and route().
00641 {
00642 ++count_enter_transient_;
00643 ACE_UNUSED_ARG (guard);
00644 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00645 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"),
00646 this->sequence_
00647 ));
00648 this->state_ = rssTRANSIENT;
00649 if (! is_safe_)
00650 {
00651 is_safe_ = true;
00652 this->until_safe_.signal ();
00653 }
00654 if (all_deliveries_complete ())
00655 {
00656 enter_state_terminal (guard);
00657 }
00658 }
|
|
|
Definition at line 716 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), ACE_OutputCDR::begin(), count_enter_updating_, DEBUG_LEVEL, LM_DEBUG, marshal(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rspm_, rssUPDATING, and TAO_Notify::Routing_Slip_Persistence_Manager::update(). Referenced by at_front_of_persist_queue().
00717 {
00718 ++count_enter_updating_;
00719 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00720 ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"),
00721 this->sequence_
00722 ));
00723 this->state_ = rssUPDATING;
00724
00725 TAO_OutputCDR rs_cdr;
00726 marshal (rs_cdr);
00727 const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00728 guard.release ();
00729
00730 ACE_ASSERT (this->rspm_ != 0);
00731 this->rspm_->update (*rs_mb);
00732 guard.acquire (); // necessary?
00733 }
|
|
|
Definition at line 216 of file Routing_Slip.cpp. Referenced by create().
00217 {
00218 return this->event_;
00219 }
|
|
|
Marshal into a CDR.
Definition at line 824 of file Routing_Slip.cpp. References delivery_requests_, ACE_Array_Base< T >::get(), TAO_Notify::Delivery_Request::marshal(), ACE_Vector< T, DEFAULT_SIZE >::size(), and ACE_OutputCDR::write_ulong(). Referenced by enter_state_saving(), and enter_state_updating().
00825 {
00826 size_t request_count = this->delivery_requests_.size();
00827 cdr.write_ulong (request_count - this->complete_requests_);
00828 for (size_t nreq = 0; nreq < request_count; ++nreq)
00829 {
00830 Delivery_Request * request = this->delivery_requests_[nreq].get ();
00831 if (request != 0)
00832 {
00833 request->marshal (cdr);
00834 }
00835 }
00836 }
|
|
|
The persistent storage has completed the last request.
Implements TAO_Notify::Persistent_Callback. Definition at line 507 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_changed(), enter_state_saved(), enter_state_terminal(), is_safe_, LM_DEBUG, LM_ERROR, ACE_Guard< ACE_LOCK >::locked(), persistent_queue_, Routing_Slip_Guard, TAO_Notify::Routing_Slip_Ptr, rssCHANGED_WHILE_SAVING, rssDELETING, rssSAVING, rssUPDATING, and until_safe_.
00508 {
00509 // keep this object around til this method returns.
00510 Routing_Slip_Ptr me(this->this_ptr_);
00511 Routing_Slip_Guard guard (this->internals_);
00512 ACE_ASSERT (guard.locked ());
00513
00514 // allow the ConsumerProxy to return from the CORBA push call.
00515 if (! is_safe_)
00516 {
00517 is_safe_ = true;
00518 this->until_safe_.signal ();
00519 }
00520
00521 State state = this->state_;
00522 switch (state)
00523 {
00524 case rssSAVING:
00525 {
00526 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00527 ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"),
00528 this->sequence_
00529 ));
00530 enter_state_saved(guard);
00531 break;
00532 }
00533 case rssCHANGED_WHILE_SAVING:
00534 {
00535 enter_state_changed (guard);
00536 break;
00537 }
00538 case rssUPDATING:
00539 {
00540 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00541 ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"),
00542 this->sequence_
00543 ));
00544 enter_state_saved (guard);
00545 break;
00546 }
00547 case rssDELETING:
00548 {
00549 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00550 ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"),
00551 this->sequence_
00552 ));
00553 enter_state_terminal (guard);
00554 break;
00555 }
00556 default:
00557 {
00558 ACE_ERROR ((LM_ERROR,
00559 ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"),
00560 static_cast<int> (this->state_)
00561 ));
00562 break;
00563 }
00564 }
00565 this->persistent_queue_.complete ();
00566 }
|
|
|
Definition at line 902 of file Routing_Slip.cpp. References ACE_CHECK, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_Vector< T, DEFAULT_SIZE >::clear(), delivery_methods_, enter_state_saved(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, and ACE_Vector< T, DEFAULT_SIZE >::size().
00903 {
00904 Routing_Slip_Guard guard (this->internals_);
00905 enter_state_saved (guard);
00906 guard.release ();
00907 //@@todo is there a worker_task available to do this?
00908 size_t count = this->delivery_methods_.size ();
00909 for (size_t nmethod = 0; nmethod < count; ++nmethod)
00910 {
00911 this->delivery_methods_[nmethod]->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
00912 ACE_CHECK;
00913 }
00914 this->delivery_methods_.clear ();
00915 }
|
|
||||||||||||
|
Route this event to destinations must be the Action request after the routing slip is created. Definition at line 232 of file Routing_Slip.cpp. References ACE_ASSERT, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_TEXT(), DEBUG_LEVEL, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, enter_state_new(), enter_state_transient(), TAO_Notify_Object::execute_task(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().
00233 {
00234 ACE_ASSERT(pc != 0);
00235
00236 TAO_Notify_ProxyConsumer::Ptr pcgrd(pc);
00237
00238 Routing_Slip_Guard guard (this->internals_);
00239
00240 size_t request_id = delivery_requests_.size ();
00241
00242 if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00243 ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"),
00244 this->sequence_,
00245 static_cast<int> (request_id),
00246 static_cast<int> (this->complete_requests_),
00247 static_cast<int> (this->delivery_requests_.size ())
00248 ));
00249
00250 Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00251 this->delivery_requests_.push_back (request);
00252 TAO_Notify_Method_Request_Lookup_Queueable method (request, pc);
00253
00254 if (this->state_ == rssCREATING)
00255 {
00256 if (! reliable_channel)
00257 {
00258 enter_state_transient (guard);
00259 }
00260 else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0)
00261 {
00262 enter_state_transient (guard);
00263 }
00264 else if (! this->event_->reliable().is_valid())
00265 {
00266 enter_state_new (guard);
00267 }
00268 else if (this->event_->reliable().value() == CosNotification::Persistent)
00269 {
00270 enter_state_new (guard);
00271 }
00272 else
00273 {
00274 enter_state_transient (guard);
00275 }
00276 }
00277 guard.release ();
00278 pc->execute_task (method ACE_ENV_ARG_PARAMETER);
00279 }
|
|
|
Provide an identifying number for this Routing Slip to use in debug messages.
Definition at line 918 of file Routing_Slip.cpp. References sequence_.
00919 {
00920 return this->sequence_;
00921 }
|
|
|
Definition at line 161 of file Routing_Slip.cpp. References rspm_, and TAO_Notify::Routing_Slip_Persistence_Manager::set_callback(). Referenced by create_persistence_manager().
|
|
|
Should delivery of this event be retried if it fails?
Definition at line 924 of file Routing_Slip.cpp. References rssTRANSIENT.
00925 {
00926 // simple minded test: if it's transient, don't retry it
00927 // @@todo Eventually this should check timeout, discard policy, etc.
00928 return this->state_ != rssTRANSIENT;
00929 }
|
|
||||||||||||
|
Marshal from CDR.
Definition at line 839 of file Routing_Slip.cpp. References ACE_CATCHANY, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_NEW_THROW_EX, ACE_TRY, ACE_TRY_CHECK, delivery_methods_, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_InputCDR::read_octet(), ACE_InputCDR::read_ulong(), ACE_Vector< T, DEFAULT_SIZE >::size(), this_ptr_, TAO_Notify_Method_Request_Lookup::unmarshal(), and TAO_Notify_Method_Request_Dispatch::unmarshal().
00840 {
00841 CORBA::ULong count = 0;
00842 cdr.read_ulong (count);
00843 for (size_t nreq = 0; nreq < count; ++nreq)
00844 {
00845 ACE_CDR::Octet code = 0;
00846 while (cdr.read_octet(code))
00847 {
00848 ACE_DECLARE_NEW_ENV;
00849 ACE_TRY
00850 {
00851 if (code == TAO_Notify_Method_Request_Dispatch::persistence_code)
00852 {
00853 Delivery_Request * prequest;
00854 ACE_NEW_THROW_EX (
00855 prequest,
00856 Delivery_Request(this_ptr_, this->delivery_requests_.size ()),
00857 CORBA::NO_MEMORY ());
00858 ACE_TRY_CHECK;
00859 Delivery_Request_Ptr request(prequest);
00860 TAO_Notify_Method_Request_Dispatch_Queueable * method =
00861 TAO_Notify_Method_Request_Dispatch::unmarshal (
00862 request,
00863 ecf,
00864 cdr
00865 ACE_ENV_ARG_PARAMETER);
00866 ACE_TRY_CHECK;
00867 if (method != 0)
00868 {
00869 this->delivery_requests_.push_back (request);
00870 this->delivery_methods_.push_back (method);
00871 }
00872 }
00873 else if (code == TAO_Notify_Method_Request_Lookup::persistence_code)
00874 {
00875 Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ()));
00876 TAO_Notify_Method_Request_Lookup_Queueable * method =
00877 TAO_Notify_Method_Request_Lookup::unmarshal (
00878 request,
00879 ecf,
00880 cdr
00881 ACE_ENV_ARG_PARAMETER);
00882 ACE_TRY_CHECK
00883 if (method != 0)
00884 {
00885 this->delivery_requests_.push_back (request);
00886 this->delivery_methods_.push_back (method);
00887 }
00888 }
00889 }
00890 ACE_CATCHANY;
00891 {
00892 // @@todo should we log this?
00893 // just ignore failures
00894 }
00895 ACE_ENDTRY;
00896 }
00897 }
00898 return this->delivery_requests_.size () > 0;
00899 }
|
|
|
Wait until the event/routing_slip has been saved at least once.
Definition at line 222 of file Routing_Slip.cpp. References is_safe_, Routing_Slip_Guard, and until_safe_.
00223 {
00224 Routing_Slip_Guard guard (this->internals_);
00225 while (!this->is_safe_)
00226 {
00227 this->until_safe_.wait ();
00228 }
00229 }
|
|
|
How many delivery requests are complete.
Definition at line 208 of file Routing_Slip.h. Referenced by all_deliveries_complete(), and delivery_request_complete(). |
|
|
Definition at line 50 of file Routing_Slip.cpp. Referenced by continue_state_changed(), and create(). |
|
|
Definition at line 48 of file Routing_Slip.cpp. Referenced by create(). |
|
|
Definition at line 42 of file Routing_Slip.cpp. Referenced by continue_state_new(), and create(). |
|
|
Definition at line 39 of file Routing_Slip.cpp. Referenced by continue_state_transient(), and create(). |
|
|
Definition at line 49 of file Routing_Slip.cpp. Referenced by create(), and enter_state_changed(). |
|
|
Definition at line 47 of file Routing_Slip.cpp. Referenced by create(), and enter_state_changed_while_saving(). |
|
|
Definition at line 51 of file Routing_Slip.cpp. Referenced by create(), and enter_state_complete(). |
|
|
Definition at line 43 of file Routing_Slip.cpp. Referenced by create(), and enter_state_complete_while_new(). |
|
|
Definition at line 52 of file Routing_Slip.cpp. Referenced by create(), and enter_state_deleting(). |
|
|
Definition at line 41 of file Routing_Slip.cpp. Referenced by create(), and enter_state_new(). |
|
|
Definition at line 40 of file Routing_Slip.cpp. Referenced by create(), and enter_state_reloaded(). |
|
|
Definition at line 45 of file Routing_Slip.cpp. Referenced by create(), and enter_state_saved(). |
|
|
Definition at line 44 of file Routing_Slip.cpp. Referenced by create(), and enter_state_saving(). |
|
|
Definition at line 53 of file Routing_Slip.cpp. Referenced by create(), and enter_state_terminal(). |
|
|
Definition at line 38 of file Routing_Slip.cpp. Referenced by create(), and enter_state_transient(). |
|
|
Definition at line 46 of file Routing_Slip.cpp. Referenced by create(), and enter_state_updating(). |
|
|
Methods that should be restarted during event recovery.
Definition at line 205 of file Routing_Slip.h. Referenced by reconnect(), and unmarshal(). |
|
|
A collection of delivery requests.
Definition at line 202 of file Routing_Slip.h. Referenced by all_deliveries_complete(), delivery_request_complete(), dispatch(), marshal(), route(), and unmarshal(). |
|
|
Definition at line 180 of file Routing_Slip.h. |
|
|
Protection for internal information.
Definition at line 168 of file Routing_Slip.h. |
|
|
true when event persistence qos is guaranteed
Definition at line 170 of file Routing_Slip.h. Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist(). |
|
|
Referenced by add_to_persist_queue(), at_front_of_persist_queue(), enter_state_saving(), and persist_complete(). |
|
|
Definition at line 37 of file Routing_Slip.cpp. Referenced by Routing_Slip(). |
|
|
Pointer to a Routing_Slip_Persistence_Manager.
Definition at line 211 of file Routing_Slip.h. Referenced by create_persistence_manager(), enter_state_deleting(), enter_state_saving(), enter_state_updating(), and set_rspm(). |
|
|
Definition at line 213 of file Routing_Slip.h. Referenced by Routing_Slip(), and sequence(). |
|
|
Definition at line 36 of file Routing_Slip.cpp. Referenced by Routing_Slip(). |
|
|
A mini-state machine to control persistence See external doc for circles and arrows. |
|
|
Smart pointer to this object Provides continuity between smart pointers and "Routing_Slip::this" Also lets the Routing_Slip manage its own minimum lifetime. Definition at line 177 of file Routing_Slip.h. Referenced by enter_state_terminal(), and unmarshal(). |
|
|
signalled when is_safe_ goes true
Definition at line 172 of file Routing_Slip.h. Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist(). |
1.3.6