Class which manages the delivery of events to destination. More...
#include <Routing_Slip.h>
Public Member Functions | |
void | set_rspm (Routing_Slip_Persistence_Manager *rspm) |
void | reconnect (void) |
virtual | ~Routing_Slip () |
destructor (should be private but that inspires compiler wars) | |
void | route (TAO_Notify_ProxyConsumer *pc, bool reliable_channel) |
void | dispatch (TAO_Notify_ProxySupplier *proxy_supplier, bool filter) |
Schedule delivery to a consumer via a proxy supplier. | |
void | wait_persist () |
Wait until the event/routing_slip has been saved at least once. | |
void | delivery_request_complete (size_t request_id) |
A delivery request has been satisfied. | |
void | at_front_of_persist_queue () |
This Routing_Slip reached the front of the persistence queue. | |
virtual void | persist_complete () |
The persistent storage has completed the last request. | |
const TAO_Notify_Event::Ptr & | event () const |
int | sequence () const |
Provide an identifying number for this Routing Slip to use in debug messages. | |
bool | should_retry () const |
Should delivery of this event be retried if it fails? | |
Static Public Member Functions | |
static Routing_Slip_Ptr | create (const TAO_Notify_Event::Ptr &event) |
"Factory" method for normal use. | |
static Routing_Slip_Ptr | create (TAO_Notify_EventChannelFactory &ecf, Routing_Slip_Persistence_Manager *rspm) |
"Factory" method for use during reload from persistent storage. | |
Private Types | |
enum | State { rssCREATING, rssTRANSIENT, rssRELOADED, rssNEW, rssCOMPLETE_WHILE_NEW, rssSAVING, rssSAVED, rssUPDATING, rssCHANGED_WHILE_SAVING, rssCHANGED, rssCOMPLETE, rssDELETING, rssTERMINAL } |
typedef ACE_Guard < TAO_SYNCH_MUTEX > | Routing_Slip_Guard |
Private Member Functions | |
void | enter_state_transient (Routing_Slip_Guard &guard) |
void | continue_state_transient (Routing_Slip_Guard &guard) |
void | enter_state_reloaded (Routing_Slip_Guard &guard) |
void | enter_state_new (Routing_Slip_Guard &guard) |
void | continue_state_new (Routing_Slip_Guard &guard) |
void | enter_state_complete_while_new (Routing_Slip_Guard &guard) |
void | enter_state_saving (Routing_Slip_Guard &guard) |
void | enter_state_saved (Routing_Slip_Guard &guard) |
void | enter_state_updating (Routing_Slip_Guard &guard) |
void | enter_state_changed_while_saving (Routing_Slip_Guard &guard) |
void | continue_state_changed_while_saving (Routing_Slip_Guard &guard) |
void | enter_state_changed (Routing_Slip_Guard &guard) |
void | continue_state_changed (Routing_Slip_Guard &guard) |
void | enter_state_complete (Routing_Slip_Guard &guard) |
void | enter_state_deleting (Routing_Slip_Guard &guard) |
void | enter_state_terminal (Routing_Slip_Guard &guard) |
bool | create_persistence_manager () |
Routing_Slip (const TAO_Notify_Event::Ptr &event) | |
Private constructor for use by create method. | |
bool | all_deliveries_complete () const |
Test to see if all deliveries are complete. | |
void | add_to_persist_queue (Routing_Slip_Guard &guard) |
This routing_slip needs to be saved. | |
void | marshal (TAO_OutputCDR &cdr) |
Marshal into a CDR. | |
bool | unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR &rscdr) |
Marshal from CDR. | |
Private Attributes | |
TAO_SYNCH_MUTEX | internals_ |
Protection for internal information. | |
bool | is_safe_ |
true when event persistence qos is guaranteed | |
ACE_SYNCH_CONDITION | until_safe_ |
signalled when is_safe_ goes true | |
Routing_Slip_Ptr | this_ptr_ |
TAO_Notify_Event::Ptr | event_ |
enum TAO_Notify::Routing_Slip::State | state_ |
Delivery_Request_Vec | delivery_requests_ |
A collection of delivery requests. | |
Delivery_Method_Vec | delivery_methods_ |
Methods that should be restarted during event recovery. | |
size_t | complete_requests_ |
How many delivery requests are complete. | |
Routing_Slip_Persistence_Manager * | rspm_ |
Pointer to a Routing_Slip_Persistence_Manager. | |
int | sequence_ |
Static Private Attributes | |
static TAO_SYNCH_MUTEX | sequence_lock_ |
static int | routing_slip_sequence_ = 0 |
static size_t | count_enter_transient_ = 0 |
static size_t | count_continue_transient_ = 0 |
static size_t | count_enter_reloaded_ = 0 |
static size_t | count_enter_new_ = 0 |
static size_t | count_continue_new_ = 0 |
static size_t | count_enter_complete_while_new_ = 0 |
static size_t | count_enter_saving_ = 0 |
static size_t | count_enter_saved_ = 0 |
static size_t | count_enter_updating_ = 0 |
static size_t | count_enter_changed_while_saving_ = 0 |
static size_t | count_continue_changed_while_saving_ = 0 |
static size_t | count_enter_changed_ = 0 |
static size_t | count_continue_changed_ = 0 |
static size_t | count_enter_complete_ = 0 |
static size_t | count_enter_deleting_ = 0 |
static size_t | count_enter_terminal_ = 0 |
static Routing_Slip_Queue | persistent_queue_ |
Class which manages the delivery of events to destination.
Interacts with persistent storage to provide reliable delivery.
Definition at line 66 of file Routing_Slip.h.
typedef ACE_Guard< TAO_SYNCH_MUTEX > TAO_Notify::Routing_Slip::Routing_Slip_Guard [private] |
Definition at line 68 of file Routing_Slip.h.
enum TAO_Notify::Routing_Slip::State [private] |
A mini-state machine to control persistence See external doc for circles and arrows.
rssCREATING | |
rssTRANSIENT | |
rssRELOADED | |
rssNEW | |
rssCOMPLETE_WHILE_NEW | |
rssSAVING | |
rssSAVED | |
rssUPDATING | |
rssCHANGED_WHILE_SAVING | |
rssCHANGED | |
rssCOMPLETE | |
rssDELETING | |
rssTERMINAL |
Definition at line 183 of file Routing_Slip.h.
{ rssCREATING, rssTRANSIENT, rssRELOADED, rssNEW, rssCOMPLETE_WHILE_NEW, rssSAVING, rssSAVED, rssUPDATING, rssCHANGED_WHILE_SAVING, rssCHANGED, rssCOMPLETE, rssDELETING, rssTERMINAL } state_;
TAO_Notify::Routing_Slip::~Routing_Slip | ( | ) | [virtual] |
destructor (should be private but that inspires compiler wars)
Definition at line 185 of file Routing_Slip.cpp.
{ if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"), this->sequence_ )); }
TAO_Notify::Routing_Slip::Routing_Slip | ( | const TAO_Notify_Event::Ptr & | event | ) | [private] |
Private constructor for use by create method.
Definition at line 167 of file Routing_Slip.cpp.
: is_safe_ (false) , until_safe_ (internals_) , this_ptr_ (0) , event_(event) , state_ (rssCREATING) , complete_requests_ (0) , rspm_ (0) { Routing_Slip_Guard guard (sequence_lock_); this->sequence_ = ++routing_slip_sequence_; if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"), this->sequence_ )); }
void TAO_Notify::Routing_Slip::add_to_persist_queue | ( | Routing_Slip_Guard & | guard | ) | [private] |
This routing_slip needs to be saved.
Definition at line 582 of file Routing_Slip.cpp.
{ guard.release (); this->persistent_queue_.add (this->this_ptr_); }
bool TAO_Notify::Routing_Slip::all_deliveries_complete | ( | ) | const [private] |
Test to see if all deliveries are complete.
Definition at line 576 of file Routing_Slip.cpp.
{ return this->complete_requests_ == this->delivery_requests_.size (); }
void TAO_Notify::Routing_Slip::at_front_of_persist_queue | ( | ) |
This Routing_Slip reached the front of the persistence queue.
Definition at line 453 of file Routing_Slip.cpp.
{ Routing_Slip_Guard guard (this->internals_); State state = this->state_; switch (state) { case rssNEW: { if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"), this->sequence_ )); enter_state_saving (guard); break; } case rssCOMPLETE_WHILE_NEW: { if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"), this->sequence_ )); guard.release (); this->persistent_queue_.complete (); enter_state_terminal (guard); break; } case rssCHANGED: { if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"), this->sequence_ )); enter_state_updating (guard); break; } case rssCOMPLETE: { if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"), this->sequence_ )); enter_state_deleting (guard); break; } default: { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"), this->sequence_, static_cast<int> (this->state_) )); break; } } }
void TAO_Notify::Routing_Slip::continue_state_changed | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 783 of file Routing_Slip.cpp.
{ ++count_continue_changed_; if (all_deliveries_complete ()) { enter_state_complete (guard); } else { guard.release (); } }
void TAO_Notify::Routing_Slip::continue_state_changed_while_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 758 of file Routing_Slip.cpp.
{
// no action necessary
guard.release ();
}
void TAO_Notify::Routing_Slip::continue_state_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 604 of file Routing_Slip.cpp.
{ ++count_continue_new_; if (all_deliveries_complete ()) { this->enter_state_complete_while_new (guard); } guard.release (); }
void TAO_Notify::Routing_Slip::continue_state_transient | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 668 of file Routing_Slip.cpp.
{ ++count_continue_transient_; if (all_deliveries_complete ()) { enter_state_terminal (guard); } else { guard.release (); } }
Routing_Slip_Ptr TAO_Notify::Routing_Slip::create | ( | const TAO_Notify_Event::Ptr & | event | ) | [static] |
"Factory" method for normal use.
Definition at line 57 of file Routing_Slip.cpp.
{ Routing_Slip * prs; ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ()); Routing_Slip_Ptr result(prs); result->this_ptr_ = result; // let the pointers touch so they use the same ref count // note we don't care about ultra-precise stats, so no guard for these if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0)) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n") ACE_TEXT (" enter_transient \t%d\n") ACE_TEXT (" continue_transient \t%d\n") ACE_TEXT (" enter_reloaded \t%d\n") ACE_TEXT (" enter_new \t%d\n") ACE_TEXT (" continue_new \t%d\n") ACE_TEXT (" enter_complete_while_new \t%d\n") ACE_TEXT (" enter_saving \t%d\n") ACE_TEXT (" enter_saved \t%d\n") ACE_TEXT (" enter_updating \t%d\n") ACE_TEXT (" enter_changed_while_saving \t%d\n") ACE_TEXT (" continue_changed_while_saving\t%d\n") ACE_TEXT (" enter_changed \t%d\n") ACE_TEXT (" continue_changed \t%d\n") ACE_TEXT (" enter_complete \t%d\n") ACE_TEXT (" enter_deleting \t%d\n") ACE_TEXT (" enter_terminal \t%d\n") , static_cast<int> (count_enter_transient_) , static_cast<int> (count_continue_transient_) , static_cast<int> (count_enter_reloaded_) , static_cast<int> (count_enter_new_) , static_cast<int> (count_continue_new_) , static_cast<int> (count_enter_complete_while_new_) , static_cast<int> (count_enter_saving_) , static_cast<int> (count_enter_saved_) , static_cast<int> (count_enter_updating_) , static_cast<int> (count_enter_changed_while_saving_) , static_cast<int> (count_continue_changed_while_saving_) , static_cast<int> (count_enter_changed_) , static_cast<int> (count_continue_changed_) , static_cast<int> (count_enter_complete_) , static_cast<int> (count_enter_deleting_) , static_cast<int> (count_enter_terminal_) )); } return result; }
Routing_Slip_Ptr TAO_Notify::Routing_Slip::create | ( | TAO_Notify_EventChannelFactory & | ecf, | |
Routing_Slip_Persistence_Manager * | rspm | |||
) | [static] |
"Factory" method for use during reload from persistent storage.
Definition at line 108 of file Routing_Slip.cpp.
{ Routing_Slip_Ptr result; ACE_Message_Block * event_mb = 0; ACE_Message_Block * rs_mb = 0; try { if (rspm->reload (event_mb, rs_mb)) { TAO_InputCDR cdr_event (event_mb); TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event)); if (event.isSet()) { result = create (event); TAO_InputCDR cdr_rs (rs_mb); if ( result->unmarshal (ecf, cdr_rs)) { result->set_rspm (rspm); } else { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n") )); result.reset (); } } else { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n") )); } } } catch (const CORBA::Exception&) { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n") )); } delete event_mb; delete rs_mb; return result; }
bool TAO_Notify::Routing_Slip::create_persistence_manager | ( | ) | [private] |
Definition at line 194 of file Routing_Slip.cpp.
{ if (this->rspm_ == 0) { Event_Persistence_Strategy * strategy = ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence"); if (strategy != 0) { Event_Persistence_Factory * factory = strategy->get_factory (); if (factory != 0) { set_rspm (factory->create_routing_slip_persistence_manager(this)); } } } return this->rspm_ != 0; }
void TAO_Notify::Routing_Slip::delivery_request_complete | ( | size_t | request_id | ) |
A delivery request has been satisfied.
Definition at line 388 of file Routing_Slip.cpp.
{ Routing_Slip_Guard guard (this->internals_); ACE_ASSERT (request_id < this->delivery_requests_.size ()); // reset the pointer to allow the delivery_request to be deleted. this->delivery_requests_[request_id].reset (); this->complete_requests_ += 1; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"), this->sequence_, static_cast<int> (request_id), static_cast<int> (this->complete_requests_), static_cast<int> (this->delivery_requests_.size ()) )); State state = this->state_; switch (state) { case rssTRANSIENT: { continue_state_transient (guard); break; } case rssNEW: { continue_state_new (guard); break; } case rssSAVING: { enter_state_changed_while_saving (guard); break; } case rssUPDATING: { enter_state_changed_while_saving (guard); break; } case rssSAVED: { enter_state_changed (guard); break; } case rssCHANGED_WHILE_SAVING: { continue_state_changed_while_saving (guard); break; } case rssCHANGED: { continue_state_changed (guard); break; } default: { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"), static_cast<int> (this->state_) )); break; } } }
void TAO_Notify::Routing_Slip::dispatch | ( | TAO_Notify_ProxySupplier * | proxy_supplier, | |
bool | filter | |||
) |
Schedule delivery to a consumer via a proxy supplier.
proxy_supplier | the proxy supplier that will deliver the event | |
filter | should consumer-based filtering be applied? |
Definition at line 336 of file Routing_Slip.cpp.
{ // cannot be the first action ACE_ASSERT (this->state_ != rssCREATING); TAO_Notify_ProxySupplier::Ptr psgrd(ps); Routing_Slip_Guard guard (this->internals_); size_t request_id = delivery_requests_.size (); if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"), this->sequence_, static_cast<int> (request_id), filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"), static_cast<int> (this->complete_requests_), static_cast<int> (this->delivery_requests_.size ()) )); Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); if (! ps->has_shutdown() ) { this->delivery_requests_.push_back (request); TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter); guard.release (); if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to " "proxy supplier %d\n", this->sequence_, static_cast<int> (request_id), ps->id())); ps->execute_task (method); } else { if (DEBUG_LEVEL > 5) ACE_DEBUG ((LM_DEBUG, "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to " "proxy supplier %d; already shut down\n", this->sequence_, static_cast<int> (request_id), ps->id())); } }
void TAO_Notify::Routing_Slip::enter_state_changed | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 765 of file Routing_Slip.cpp.
{ ++count_enter_changed_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"), this->sequence_ )); // complete state change BEFORE initiating request to avoid // race condition if request finishes before state is stable. this->state_ = rssCHANGED; if (all_deliveries_complete ()) { enter_state_complete (guard); } add_to_persist_queue (guard); }
void TAO_Notify::Routing_Slip::enter_state_changed_while_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 746 of file Routing_Slip.cpp.
{ ++count_enter_changed_while_saving_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"), this->sequence_ )); this->state_ = rssCHANGED_WHILE_SAVING; guard.release (); }
void TAO_Notify::Routing_Slip::enter_state_complete | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 797 of file Routing_Slip.cpp.
{ ++count_enter_complete_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"), this->sequence_ )); this->state_ = rssCOMPLETE; guard.release (); }
void TAO_Notify::Routing_Slip::enter_state_complete_while_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 614 of file Routing_Slip.cpp.
{ ++count_enter_complete_while_new_; ACE_UNUSED_ARG (guard); if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"), this->sequence_ )); // allow the ConsumerProxy to return from the CORBA push call. if (! is_safe_) { is_safe_ = true; this->until_safe_.signal (); } this->state_ = rssCOMPLETE_WHILE_NEW; }
void TAO_Notify::Routing_Slip::enter_state_deleting | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 809 of file Routing_Slip.cpp.
{ ++count_enter_deleting_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"), this->sequence_ )); this->state_ = rssDELETING; guard.release (); this->rspm_->remove (); }
void TAO_Notify::Routing_Slip::enter_state_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 592 of file Routing_Slip.cpp.
{ ++count_enter_new_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"), this->sequence_ )); this->state_ = rssNEW; add_to_persist_queue(guard); }
void TAO_Notify::Routing_Slip::enter_state_reloaded | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 632 of file Routing_Slip.cpp.
{ ++count_enter_reloaded_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"), this->sequence_ )); this->state_ = rssRELOADED; guard.release(); }
void TAO_Notify::Routing_Slip::enter_state_saved | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 714 of file Routing_Slip.cpp.
{ ++count_enter_saved_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"), this->sequence_ )); this->state_ = rssSAVED; guard.release (); }
void TAO_Notify::Routing_Slip::enter_state_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 681 of file Routing_Slip.cpp.
{ ++count_enter_saving_; if (!create_persistence_manager ()) { // Note This should actually be a throw (out of memory) // but we cheat and make this a transient event. guard.release (); this->persistent_queue_.complete (); enter_state_transient (guard); } else { if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"), this->sequence_ )); this->state_ = rssSAVING; TAO_OutputCDR event_cdr; this->event_->marshal (event_cdr); const ACE_Message_Block *event_mb = event_cdr.begin (); TAO_OutputCDR rs_cdr; marshal (rs_cdr); const ACE_Message_Block *rs_mb = rs_cdr.begin (); guard.release (); this->rspm_->store (*event_mb, *rs_mb); } }
void TAO_Notify::Routing_Slip::enter_state_terminal | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 822 of file Routing_Slip.cpp.
{ ++count_enter_terminal_; ACE_ASSERT( this->is_safe_); if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"), this->sequence_ )); this->state_ = rssTERMINAL; this->this_ptr_.reset (); guard.release (); }
void TAO_Notify::Routing_Slip::enter_state_transient | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 644 of file Routing_Slip.cpp.
{ ++count_enter_transient_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"), this->sequence_ )); this->state_ = rssTRANSIENT; if (! is_safe_) { is_safe_ = true; this->until_safe_.signal (); } if (all_deliveries_complete ()) { enter_state_terminal (guard); } else { guard.release (); } }
void TAO_Notify::Routing_Slip::enter_state_updating | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 726 of file Routing_Slip.cpp.
{ ++count_enter_updating_; if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"), this->sequence_ )); this->state_ = rssUPDATING; TAO_OutputCDR rs_cdr; marshal (rs_cdr); const ACE_Message_Block *rs_mb = rs_cdr.begin (); guard.release (); ACE_ASSERT (this->rspm_ != 0); this->rspm_->update (*rs_mb); }
const TAO_Notify_Event::Ptr & TAO_Notify::Routing_Slip::event | ( | ) | const |
Definition at line 213 of file Routing_Slip.cpp.
{ return this->event_; }
void TAO_Notify::Routing_Slip::marshal | ( | TAO_OutputCDR & | cdr | ) | [private] |
Marshal into a CDR.
Definition at line 836 of file Routing_Slip.cpp.
{ size_t request_count = this->delivery_requests_.size(); cdr.write_ulong ( ACE_Utils::truncate_cast<CORBA::ULong> (request_count - this->complete_requests_)); for (size_t nreq = 0; nreq < request_count; ++nreq) { Delivery_Request * request = this->delivery_requests_[nreq].get (); if (request != 0) { request->marshal (cdr); } } }
void TAO_Notify::Routing_Slip::persist_complete | ( | ) | [virtual] |
The persistent storage has completed the last request.
Implements TAO_Notify::Persistent_Callback.
Definition at line 510 of file Routing_Slip.cpp.
{ // keep this object around til this method returns. Routing_Slip_Ptr me(this->this_ptr_); Routing_Slip_Guard guard (this->internals_); ACE_ASSERT (guard.locked ()); // allow the ConsumerProxy to return from the CORBA push call. if (! is_safe_) { is_safe_ = true; this->until_safe_.signal (); } State state = this->state_; switch (state) { case rssSAVING: { if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"), this->sequence_ )); enter_state_saved(guard); break; } case rssCHANGED_WHILE_SAVING: { enter_state_changed (guard); break; } case rssUPDATING: { if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"), this->sequence_ )); enter_state_saved (guard); break; } case rssDELETING: { if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"), this->sequence_ )); enter_state_terminal (guard); break; } default: { ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"), static_cast<int> (this->state_) )); guard.release (); break; } } this->persistent_queue_.complete (); }
void TAO_Notify::Routing_Slip::reconnect | ( | void | ) |
Definition at line 908 of file Routing_Slip.cpp.
{ Routing_Slip_Guard guard (this->internals_); enter_state_saved (guard); //@@todo is there a worker_task available to do this? size_t count = this->delivery_methods_.size (); for (size_t nmethod = 0; nmethod < count; ++nmethod) { this->delivery_methods_[nmethod]->execute (); } this->delivery_methods_.clear (); }
void TAO_Notify::Routing_Slip::route | ( | TAO_Notify_ProxyConsumer * | pc, | |
bool | reliable_channel | |||
) |
Route this event to destinations must be the Action request after the routing slip is created.
Definition at line 229 of file Routing_Slip.cpp.
{ ACE_ASSERT(pc != 0); TAO_Notify_ProxyConsumer::Ptr pcgrd(pc); Routing_Slip_Guard guard (this->internals_); size_t request_id = delivery_requests_.size (); if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"), this->sequence_, static_cast<int> (request_id), static_cast<int> (this->complete_requests_), static_cast<int> (this->delivery_requests_.size ()) )); Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id)); this->delivery_requests_.push_back (request); TAO_Notify_Method_Request_Lookup_Queueable method (request, pc); if (this->state_ == rssCREATING) { if (! reliable_channel) { enter_state_transient (guard); } else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0) { enter_state_transient (guard); } else if (! this->event_->reliable().is_valid()) { enter_state_new (guard); } else if (this->event_->reliable().value() == true) { enter_state_new (guard); } else { enter_state_transient (guard); } } else { // We only need to release the guard if the state is rssCREATING. // By calling enter_state_*, we are guaranteed that the guard has // been released. guard.release (); } pc->execute_task (method); }
int TAO_Notify::Routing_Slip::sequence | ( | ) | const |
Provide an identifying number for this Routing Slip to use in debug messages.
Definition at line 923 of file Routing_Slip.cpp.
{ return this->sequence_; }
void TAO_Notify::Routing_Slip::set_rspm | ( | Routing_Slip_Persistence_Manager * | rspm | ) |
Definition at line 158 of file Routing_Slip.cpp.
bool TAO_Notify::Routing_Slip::should_retry | ( | ) | const |
Should delivery of this event be retried if it fails?
Definition at line 929 of file Routing_Slip.cpp.
{ // simple minded test: if it's transient, don't retry it // @@todo Eventually this should check timeout, discard policy, etc. return this->state_ != rssTRANSIENT; }
bool TAO_Notify::Routing_Slip::unmarshal | ( | TAO_Notify_EventChannelFactory & | ecf, | |
TAO_InputCDR & | rscdr | |||
) | [private] |
Marshal from CDR.
Definition at line 852 of file Routing_Slip.cpp.
{ CORBA::ULong count = 0; cdr.read_ulong (count); for (size_t nreq = 0; nreq < count; ++nreq) { ACE_CDR::Octet code = 0; while (cdr.read_octet(code)) { try { if (code == TAO_Notify_Method_Request_Dispatch::persistence_code) { Delivery_Request * prequest; ACE_NEW_THROW_EX ( prequest, Delivery_Request(this_ptr_, this->delivery_requests_.size ()), CORBA::NO_MEMORY ()); Delivery_Request_Ptr request(prequest); TAO_Notify_Method_Request_Dispatch_Queueable * method = TAO_Notify_Method_Request_Dispatch::unmarshal ( request, ecf, cdr); if (method != 0) { this->delivery_requests_.push_back (request); this->delivery_methods_.push_back (method); } } else if (code == TAO_Notify_Method_Request_Lookup::persistence_code) { Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ())); TAO_Notify_Method_Request_Lookup_Queueable * method = TAO_Notify_Method_Request_Lookup::unmarshal ( request, ecf, cdr); if (method != 0) { this->delivery_requests_.push_back (request); this->delivery_methods_.push_back (method); } } } catch (const CORBA::Exception&) { // @@todo should we log this? // just ignore failures } } } return this->delivery_requests_.size () > 0; }
void TAO_Notify::Routing_Slip::wait_persist | ( | ) |
Wait until the event/routing_slip has been saved at least once.
Definition at line 219 of file Routing_Slip.cpp.
{ Routing_Slip_Guard guard (this->internals_); while (!this->is_safe_) { this->until_safe_.wait (); } }
size_t TAO_Notify::Routing_Slip::complete_requests_ [private] |
How many delivery requests are complete.
Definition at line 207 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_continue_changed_ = 0 [static, private] |
Definition at line 228 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_continue_changed_while_saving_ = 0 [static, private] |
Definition at line 226 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_continue_new_ = 0 [static, private] |
Definition at line 220 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_continue_transient_ = 0 [static, private] |
Definition at line 217 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_changed_ = 0 [static, private] |
Definition at line 227 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_changed_while_saving_ = 0 [static, private] |
Definition at line 225 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_complete_ = 0 [static, private] |
Definition at line 229 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_complete_while_new_ = 0 [static, private] |
Definition at line 221 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_deleting_ = 0 [static, private] |
Definition at line 230 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_new_ = 0 [static, private] |
Definition at line 219 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_reloaded_ = 0 [static, private] |
Definition at line 218 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_saved_ = 0 [static, private] |
Definition at line 223 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_saving_ = 0 [static, private] |
Definition at line 222 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_terminal_ = 0 [static, private] |
Definition at line 231 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_transient_ = 0 [static, private] |
Definition at line 216 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_updating_ = 0 [static, private] |
Definition at line 224 of file Routing_Slip.h.
Methods that should be restarted during event recovery.
Definition at line 204 of file Routing_Slip.h.
A collection of delivery requests.
Definition at line 201 of file Routing_Slip.h.
Definition at line 179 of file Routing_Slip.h.
TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::internals_ [private] |
Protection for internal information.
Definition at line 167 of file Routing_Slip.h.
bool TAO_Notify::Routing_Slip::is_safe_ [private] |
true when event persistence qos is guaranteed
Definition at line 169 of file Routing_Slip.h.
Routing_Slip_Queue TAO_Notify::Routing_Slip::persistent_queue_ [static, private] |
Definition at line 233 of file Routing_Slip.h.
int TAO_Notify::Routing_Slip::routing_slip_sequence_ = 0 [static, private] |
Definition at line 215 of file Routing_Slip.h.
Pointer to a Routing_Slip_Persistence_Manager.
Definition at line 210 of file Routing_Slip.h.
int TAO_Notify::Routing_Slip::sequence_ [private] |
Definition at line 212 of file Routing_Slip.h.
TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::sequence_lock_ [static, private] |
Definition at line 214 of file Routing_Slip.h.
enum TAO_Notify::Routing_Slip::State TAO_Notify::Routing_Slip::state_ [private] |
A mini-state machine to control persistence See external doc for circles and arrows.
Smart pointer to this object Provides continuity between smart pointers and "Routing_Slip::this" Also lets the Routing_Slip manage its own minimum lifetime.
Definition at line 176 of file Routing_Slip.h.
ACE_SYNCH_CONDITION TAO_Notify::Routing_Slip::until_safe_ [private] |
signalled when is_safe_ goes true
Definition at line 171 of file Routing_Slip.h.