Class which manages the delivery of events to destination. More...
#include <Routing_Slip.h>


Public Member Functions | |
| void | set_rspm (Routing_Slip_Persistence_Manager *rspm) |
| void | reconnect (void) |
| virtual | ~Routing_Slip () |
| destructor (should be private but that inspires compiler wars) | |
| void | route (TAO_Notify_ProxyConsumer *pc, bool reliable_channel) |
| void | dispatch (TAO_Notify_ProxySupplier *proxy_supplier, bool filter) |
| Schedule delivery to a consumer via a proxy supplier. | |
| void | wait_persist () |
| Wait until the event/routing_slip has been saved at least once. | |
| void | delivery_request_complete (size_t request_id) |
| A delivery request has been satisfied. | |
| void | at_front_of_persist_queue () |
| This Routing_Slip reached the front of the persistence queue. | |
| virtual void | persist_complete () |
| The persistent storage has completed the last request. | |
| const TAO_Notify_Event::Ptr & | event () const |
| int | sequence () const |
| Provide an identifying number for this Routing Slip to use in debug messages. | |
| bool | should_retry () const |
| Should delivery of this event be retried if it fails? | |
Static Public Member Functions | |
| static Routing_Slip_Ptr | create (const TAO_Notify_Event::Ptr &event) |
| "Factory" method for normal use. | |
| static Routing_Slip_Ptr | create (TAO_Notify_EventChannelFactory &ecf, Routing_Slip_Persistence_Manager *rspm) |
| "Factory" method for use during reload from persistent storage. | |
Private Types | |
| enum | State { rssCREATING, rssTRANSIENT, rssRELOADED, rssNEW, rssCOMPLETE_WHILE_NEW, rssSAVING, rssSAVED, rssUPDATING, rssCHANGED_WHILE_SAVING, rssCHANGED, rssCOMPLETE, rssDELETING, rssTERMINAL } |
| typedef ACE_Guard < TAO_SYNCH_MUTEX > | Routing_Slip_Guard |
Private Member Functions | |
| void | enter_state_transient (Routing_Slip_Guard &guard) |
| void | continue_state_transient (Routing_Slip_Guard &guard) |
| void | enter_state_reloaded (Routing_Slip_Guard &guard) |
| void | enter_state_new (Routing_Slip_Guard &guard) |
| void | continue_state_new (Routing_Slip_Guard &guard) |
| void | enter_state_complete_while_new (Routing_Slip_Guard &guard) |
| void | enter_state_saving (Routing_Slip_Guard &guard) |
| void | enter_state_saved (Routing_Slip_Guard &guard) |
| void | enter_state_updating (Routing_Slip_Guard &guard) |
| void | enter_state_changed_while_saving (Routing_Slip_Guard &guard) |
| void | continue_state_changed_while_saving (Routing_Slip_Guard &guard) |
| void | enter_state_changed (Routing_Slip_Guard &guard) |
| void | continue_state_changed (Routing_Slip_Guard &guard) |
| void | enter_state_complete (Routing_Slip_Guard &guard) |
| void | enter_state_deleting (Routing_Slip_Guard &guard) |
| void | enter_state_terminal (Routing_Slip_Guard &guard) |
| bool | create_persistence_manager () |
| Routing_Slip (const TAO_Notify_Event::Ptr &event) | |
| Private constructor for use by create method. | |
| bool | all_deliveries_complete () const |
| Test to see if all deliveries are complete. | |
| void | add_to_persist_queue (Routing_Slip_Guard &guard) |
| This routing_slip needs to be saved. | |
| void | marshal (TAO_OutputCDR &cdr) |
| Marshal into a CDR. | |
| bool | unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR &rscdr) |
| Marshal from CDR. | |
Private Attributes | |
| TAO_SYNCH_MUTEX | internals_ |
| Protection for internal information. | |
| bool | is_safe_ |
| true when event persistence qos is guaranteed | |
| ACE_SYNCH_CONDITION | until_safe_ |
| signalled when is_safe_ goes true | |
| Routing_Slip_Ptr | this_ptr_ |
| TAO_Notify_Event::Ptr | event_ |
| enum TAO_Notify::Routing_Slip::State | state_ |
| Delivery_Request_Vec | delivery_requests_ |
| A collection of delivery requests. | |
| Delivery_Method_Vec | delivery_methods_ |
| Methods that should be restarted during event recovery. | |
| size_t | complete_requests_ |
| How many delivery requests are complete. | |
| Routing_Slip_Persistence_Manager * | rspm_ |
| Pointer to a Routing_Slip_Persistence_Manager. | |
| int | sequence_ |
Static Private Attributes | |
| static TAO_SYNCH_MUTEX | sequence_lock_ |
| static int | routing_slip_sequence_ = 0 |
| static size_t | count_enter_transient_ = 0 |
| static size_t | count_continue_transient_ = 0 |
| static size_t | count_enter_reloaded_ = 0 |
| static size_t | count_enter_new_ = 0 |
| static size_t | count_continue_new_ = 0 |
| static size_t | count_enter_complete_while_new_ = 0 |
| static size_t | count_enter_saving_ = 0 |
| static size_t | count_enter_saved_ = 0 |
| static size_t | count_enter_updating_ = 0 |
| static size_t | count_enter_changed_while_saving_ = 0 |
| static size_t | count_continue_changed_while_saving_ = 0 |
| static size_t | count_enter_changed_ = 0 |
| static size_t | count_continue_changed_ = 0 |
| static size_t | count_enter_complete_ = 0 |
| static size_t | count_enter_deleting_ = 0 |
| static size_t | count_enter_terminal_ = 0 |
| static Routing_Slip_Queue | persistent_queue_ |
Class which manages the delivery of events to destination.
Interacts with persistent storage to provide reliable delivery.
Definition at line 66 of file Routing_Slip.h.
typedef ACE_Guard< TAO_SYNCH_MUTEX > TAO_Notify::Routing_Slip::Routing_Slip_Guard [private] |
Definition at line 68 of file Routing_Slip.h.
enum TAO_Notify::Routing_Slip::State [private] |
A mini-state machine to control persistence See external doc for circles and arrows.
| rssCREATING | |
| rssTRANSIENT | |
| rssRELOADED | |
| rssNEW | |
| rssCOMPLETE_WHILE_NEW | |
| rssSAVING | |
| rssSAVED | |
| rssUPDATING | |
| rssCHANGED_WHILE_SAVING | |
| rssCHANGED | |
| rssCOMPLETE | |
| rssDELETING | |
| rssTERMINAL |
Definition at line 183 of file Routing_Slip.h.
{
rssCREATING,
rssTRANSIENT,
rssRELOADED,
rssNEW,
rssCOMPLETE_WHILE_NEW,
rssSAVING,
rssSAVED,
rssUPDATING,
rssCHANGED_WHILE_SAVING,
rssCHANGED,
rssCOMPLETE,
rssDELETING,
rssTERMINAL
} state_;
| TAO_Notify::Routing_Slip::~Routing_Slip | ( | ) | [virtual] |
destructor (should be private but that inspires compiler wars)
Definition at line 185 of file Routing_Slip.cpp.
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"),
this->sequence_
));
}
| TAO_Notify::Routing_Slip::Routing_Slip | ( | const TAO_Notify_Event::Ptr & | event | ) | [private] |
Private constructor for use by create method.
Definition at line 167 of file Routing_Slip.cpp.
: is_safe_ (false) , until_safe_ (internals_) , this_ptr_ (0) , event_(event) , state_ (rssCREATING) , complete_requests_ (0) , rspm_ (0) { Routing_Slip_Guard guard (sequence_lock_); this->sequence_ = ++routing_slip_sequence_; if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"), this->sequence_ )); }
| void TAO_Notify::Routing_Slip::add_to_persist_queue | ( | Routing_Slip_Guard & | guard | ) | [private] |
This routing_slip needs to be saved.
Definition at line 582 of file Routing_Slip.cpp.
{
guard.release ();
this->persistent_queue_.add (this->this_ptr_);
}
| bool TAO_Notify::Routing_Slip::all_deliveries_complete | ( | ) | const [private] |
Test to see if all deliveries are complete.
Definition at line 576 of file Routing_Slip.cpp.
{
return this->complete_requests_ == this->delivery_requests_.size ();
}
| void TAO_Notify::Routing_Slip::at_front_of_persist_queue | ( | ) |
This Routing_Slip reached the front of the persistence queue.
Definition at line 453 of file Routing_Slip.cpp.
{
Routing_Slip_Guard guard (this->internals_);
State state = this->state_;
switch (state)
{
case rssNEW:
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"),
this->sequence_
));
enter_state_saving (guard);
break;
}
case rssCOMPLETE_WHILE_NEW:
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"),
this->sequence_
));
guard.release ();
this->persistent_queue_.complete ();
enter_state_terminal (guard);
break;
}
case rssCHANGED:
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"),
this->sequence_
));
enter_state_updating (guard);
break;
}
case rssCOMPLETE:
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"),
this->sequence_
));
enter_state_deleting (guard);
break;
}
default:
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"),
this->sequence_,
static_cast<int> (this->state_)
));
break;
}
}
}
| void TAO_Notify::Routing_Slip::continue_state_changed | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 783 of file Routing_Slip.cpp.
{
++count_continue_changed_;
if (all_deliveries_complete ())
{
enter_state_complete (guard);
}
else
{
guard.release ();
}
}
| void TAO_Notify::Routing_Slip::continue_state_changed_while_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 758 of file Routing_Slip.cpp.
{
// no action necessary
guard.release ();
}
| void TAO_Notify::Routing_Slip::continue_state_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 604 of file Routing_Slip.cpp.
{
++count_continue_new_;
if (all_deliveries_complete ())
{
this->enter_state_complete_while_new (guard);
}
guard.release ();
}
| void TAO_Notify::Routing_Slip::continue_state_transient | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 668 of file Routing_Slip.cpp.
{
++count_continue_transient_;
if (all_deliveries_complete ())
{
enter_state_terminal (guard);
}
else
{
guard.release ();
}
}
| Routing_Slip_Ptr TAO_Notify::Routing_Slip::create | ( | const TAO_Notify_Event::Ptr & | event | ) | [static] |
"Factory" method for normal use.
Definition at line 57 of file Routing_Slip.cpp.
{
Routing_Slip * prs;
ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ());
Routing_Slip_Ptr result(prs);
result->this_ptr_ = result; // let the pointers touch so they use the same ref count
// note we don't care about ultra-precise stats, so no guard for these
if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0))
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n")
ACE_TEXT (" enter_transient \t%d\n")
ACE_TEXT (" continue_transient \t%d\n")
ACE_TEXT (" enter_reloaded \t%d\n")
ACE_TEXT (" enter_new \t%d\n")
ACE_TEXT (" continue_new \t%d\n")
ACE_TEXT (" enter_complete_while_new \t%d\n")
ACE_TEXT (" enter_saving \t%d\n")
ACE_TEXT (" enter_saved \t%d\n")
ACE_TEXT (" enter_updating \t%d\n")
ACE_TEXT (" enter_changed_while_saving \t%d\n")
ACE_TEXT (" continue_changed_while_saving\t%d\n")
ACE_TEXT (" enter_changed \t%d\n")
ACE_TEXT (" continue_changed \t%d\n")
ACE_TEXT (" enter_complete \t%d\n")
ACE_TEXT (" enter_deleting \t%d\n")
ACE_TEXT (" enter_terminal \t%d\n")
, static_cast<int> (count_enter_transient_)
, static_cast<int> (count_continue_transient_)
, static_cast<int> (count_enter_reloaded_)
, static_cast<int> (count_enter_new_)
, static_cast<int> (count_continue_new_)
, static_cast<int> (count_enter_complete_while_new_)
, static_cast<int> (count_enter_saving_)
, static_cast<int> (count_enter_saved_)
, static_cast<int> (count_enter_updating_)
, static_cast<int> (count_enter_changed_while_saving_)
, static_cast<int> (count_continue_changed_while_saving_)
, static_cast<int> (count_enter_changed_)
, static_cast<int> (count_continue_changed_)
, static_cast<int> (count_enter_complete_)
, static_cast<int> (count_enter_deleting_)
, static_cast<int> (count_enter_terminal_)
));
}
return result;
}
| Routing_Slip_Ptr TAO_Notify::Routing_Slip::create | ( | TAO_Notify_EventChannelFactory & | ecf, | |
| Routing_Slip_Persistence_Manager * | rspm | |||
| ) | [static] |
"Factory" method for use during reload from persistent storage.
Definition at line 108 of file Routing_Slip.cpp.
{
Routing_Slip_Ptr result;
ACE_Message_Block * event_mb = 0;
ACE_Message_Block * rs_mb = 0;
try
{
if (rspm->reload (event_mb, rs_mb))
{
TAO_InputCDR cdr_event (event_mb);
TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event));
if (event.isSet())
{
result = create (event);
TAO_InputCDR cdr_rs (rs_mb);
if ( result->unmarshal (ecf, cdr_rs))
{
result->set_rspm (rspm);
}
else
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n")
));
result.reset ();
}
}
else
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n")
));
}
}
}
catch (const CORBA::Exception&)
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n")
));
}
delete event_mb;
delete rs_mb;
return result;
}
| bool TAO_Notify::Routing_Slip::create_persistence_manager | ( | ) | [private] |
Definition at line 194 of file Routing_Slip.cpp.
{
if (this->rspm_ == 0)
{
Event_Persistence_Strategy * strategy =
ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
if (strategy != 0)
{
Event_Persistence_Factory * factory = strategy->get_factory ();
if (factory != 0)
{
set_rspm (factory->create_routing_slip_persistence_manager(this));
}
}
}
return this->rspm_ != 0;
}
| void TAO_Notify::Routing_Slip::delivery_request_complete | ( | size_t | request_id | ) |
A delivery request has been satisfied.
Definition at line 388 of file Routing_Slip.cpp.
{
Routing_Slip_Guard guard (this->internals_);
ACE_ASSERT (request_id < this->delivery_requests_.size ());
// reset the pointer to allow the delivery_request to be deleted.
this->delivery_requests_[request_id].reset ();
this->complete_requests_ += 1;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"),
this->sequence_,
static_cast<int> (request_id),
static_cast<int> (this->complete_requests_),
static_cast<int> (this->delivery_requests_.size ())
));
State state = this->state_;
switch (state)
{
case rssTRANSIENT:
{
continue_state_transient (guard);
break;
}
case rssNEW:
{
continue_state_new (guard);
break;
}
case rssSAVING:
{
enter_state_changed_while_saving (guard);
break;
}
case rssUPDATING:
{
enter_state_changed_while_saving (guard);
break;
}
case rssSAVED:
{
enter_state_changed (guard);
break;
}
case rssCHANGED_WHILE_SAVING:
{
continue_state_changed_while_saving (guard);
break;
}
case rssCHANGED:
{
continue_state_changed (guard);
break;
}
default:
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"),
static_cast<int> (this->state_)
));
break;
}
}
}
| void TAO_Notify::Routing_Slip::dispatch | ( | TAO_Notify_ProxySupplier * | proxy_supplier, | |
| bool | filter | |||
| ) |
Schedule delivery to a consumer via a proxy supplier.
| proxy_supplier | the proxy supplier that will deliver the event | |
| filter | should consumer-based filtering be applied? |
Definition at line 336 of file Routing_Slip.cpp.
{
// cannot be the first action
ACE_ASSERT (this->state_ != rssCREATING);
TAO_Notify_ProxySupplier::Ptr psgrd(ps);
Routing_Slip_Guard guard (this->internals_);
size_t request_id = delivery_requests_.size ();
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"),
this->sequence_,
static_cast<int> (request_id),
filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
static_cast<int> (this->complete_requests_),
static_cast<int> (this->delivery_requests_.size ())
));
Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
if (! ps->has_shutdown() )
{
this->delivery_requests_.push_back (request);
TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
guard.release ();
if (DEBUG_LEVEL > 8)
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
"proxy supplier %d\n",
this->sequence_,
static_cast<int> (request_id),
ps->id()));
ps->execute_task (method);
}
else
{
if (DEBUG_LEVEL > 5)
ACE_DEBUG ((LM_DEBUG,
"(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
"proxy supplier %d; already shut down\n",
this->sequence_,
static_cast<int> (request_id),
ps->id()));
}
}
| void TAO_Notify::Routing_Slip::enter_state_changed | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 765 of file Routing_Slip.cpp.
{
++count_enter_changed_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"),
this->sequence_
));
// complete state change BEFORE initiating request to avoid
// race condition if request finishes before state is stable.
this->state_ = rssCHANGED;
if (all_deliveries_complete ())
{
enter_state_complete (guard);
}
add_to_persist_queue (guard);
}
| void TAO_Notify::Routing_Slip::enter_state_changed_while_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 746 of file Routing_Slip.cpp.
{
++count_enter_changed_while_saving_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"),
this->sequence_
));
this->state_ = rssCHANGED_WHILE_SAVING;
guard.release ();
}
| void TAO_Notify::Routing_Slip::enter_state_complete | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 797 of file Routing_Slip.cpp.
{
++count_enter_complete_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"),
this->sequence_
));
this->state_ = rssCOMPLETE;
guard.release ();
}
| void TAO_Notify::Routing_Slip::enter_state_complete_while_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 614 of file Routing_Slip.cpp.
{
++count_enter_complete_while_new_;
ACE_UNUSED_ARG (guard);
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"),
this->sequence_
));
// allow the ConsumerProxy to return from the CORBA push call.
if (! is_safe_)
{
is_safe_ = true;
this->until_safe_.signal ();
}
this->state_ = rssCOMPLETE_WHILE_NEW;
}
| void TAO_Notify::Routing_Slip::enter_state_deleting | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 809 of file Routing_Slip.cpp.
{
++count_enter_deleting_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"),
this->sequence_
));
this->state_ = rssDELETING;
guard.release ();
this->rspm_->remove ();
}
| void TAO_Notify::Routing_Slip::enter_state_new | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 592 of file Routing_Slip.cpp.
{
++count_enter_new_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"),
this->sequence_
));
this->state_ = rssNEW;
add_to_persist_queue(guard);
}
| void TAO_Notify::Routing_Slip::enter_state_reloaded | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 632 of file Routing_Slip.cpp.
{
++count_enter_reloaded_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"),
this->sequence_
));
this->state_ = rssRELOADED;
guard.release();
}
| void TAO_Notify::Routing_Slip::enter_state_saved | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 714 of file Routing_Slip.cpp.
{
++count_enter_saved_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"),
this->sequence_
));
this->state_ = rssSAVED;
guard.release ();
}
| void TAO_Notify::Routing_Slip::enter_state_saving | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 681 of file Routing_Slip.cpp.
{
++count_enter_saving_;
if (!create_persistence_manager ())
{
// Note This should actually be a throw (out of memory)
// but we cheat and make this a transient event.
guard.release ();
this->persistent_queue_.complete ();
enter_state_transient (guard);
}
else
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"),
this->sequence_
));
this->state_ = rssSAVING;
TAO_OutputCDR event_cdr;
this->event_->marshal (event_cdr);
const ACE_Message_Block *event_mb = event_cdr.begin ();
TAO_OutputCDR rs_cdr;
marshal (rs_cdr);
const ACE_Message_Block *rs_mb = rs_cdr.begin ();
guard.release ();
this->rspm_->store (*event_mb, *rs_mb);
}
}
| void TAO_Notify::Routing_Slip::enter_state_terminal | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 822 of file Routing_Slip.cpp.
{
++count_enter_terminal_;
ACE_ASSERT( this->is_safe_);
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"),
this->sequence_
));
this->state_ = rssTERMINAL;
this->this_ptr_.reset ();
guard.release ();
}
| void TAO_Notify::Routing_Slip::enter_state_transient | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 644 of file Routing_Slip.cpp.
{
++count_enter_transient_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"),
this->sequence_
));
this->state_ = rssTRANSIENT;
if (! is_safe_)
{
is_safe_ = true;
this->until_safe_.signal ();
}
if (all_deliveries_complete ())
{
enter_state_terminal (guard);
}
else
{
guard.release ();
}
}
| void TAO_Notify::Routing_Slip::enter_state_updating | ( | Routing_Slip_Guard & | guard | ) | [private] |
Definition at line 726 of file Routing_Slip.cpp.
{
++count_enter_updating_;
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"),
this->sequence_
));
this->state_ = rssUPDATING;
TAO_OutputCDR rs_cdr;
marshal (rs_cdr);
const ACE_Message_Block *rs_mb = rs_cdr.begin ();
guard.release ();
ACE_ASSERT (this->rspm_ != 0);
this->rspm_->update (*rs_mb);
}
| const TAO_Notify_Event::Ptr & TAO_Notify::Routing_Slip::event | ( | ) | const |
Definition at line 213 of file Routing_Slip.cpp.
{
return this->event_;
}
| void TAO_Notify::Routing_Slip::marshal | ( | TAO_OutputCDR & | cdr | ) | [private] |
Marshal into a CDR.
Definition at line 836 of file Routing_Slip.cpp.
{
size_t request_count = this->delivery_requests_.size();
cdr.write_ulong (
ACE_Utils::truncate_cast<CORBA::ULong> (request_count - this->complete_requests_));
for (size_t nreq = 0; nreq < request_count; ++nreq)
{
Delivery_Request * request = this->delivery_requests_[nreq].get ();
if (request != 0)
{
request->marshal (cdr);
}
}
}
| void TAO_Notify::Routing_Slip::persist_complete | ( | ) | [virtual] |
The persistent storage has completed the last request.
Implements TAO_Notify::Persistent_Callback.
Definition at line 510 of file Routing_Slip.cpp.
{
// keep this object around til this method returns.
Routing_Slip_Ptr me(this->this_ptr_);
Routing_Slip_Guard guard (this->internals_);
ACE_ASSERT (guard.locked ());
// allow the ConsumerProxy to return from the CORBA push call.
if (! is_safe_)
{
is_safe_ = true;
this->until_safe_.signal ();
}
State state = this->state_;
switch (state)
{
case rssSAVING:
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"),
this->sequence_
));
enter_state_saved(guard);
break;
}
case rssCHANGED_WHILE_SAVING:
{
enter_state_changed (guard);
break;
}
case rssUPDATING:
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"),
this->sequence_
));
enter_state_saved (guard);
break;
}
case rssDELETING:
{
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"),
this->sequence_
));
enter_state_terminal (guard);
break;
}
default:
{
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"),
static_cast<int> (this->state_)
));
guard.release ();
break;
}
}
this->persistent_queue_.complete ();
}
| void TAO_Notify::Routing_Slip::reconnect | ( | void | ) |
Definition at line 908 of file Routing_Slip.cpp.
{
Routing_Slip_Guard guard (this->internals_);
enter_state_saved (guard);
//@@todo is there a worker_task available to do this?
size_t count = this->delivery_methods_.size ();
for (size_t nmethod = 0; nmethod < count; ++nmethod)
{
this->delivery_methods_[nmethod]->execute ();
}
this->delivery_methods_.clear ();
}
| void TAO_Notify::Routing_Slip::route | ( | TAO_Notify_ProxyConsumer * | pc, | |
| bool | reliable_channel | |||
| ) |
Route this event to destinations must be the Action request after the routing slip is created.
Definition at line 229 of file Routing_Slip.cpp.
{
ACE_ASSERT(pc != 0);
TAO_Notify_ProxyConsumer::Ptr pcgrd(pc);
Routing_Slip_Guard guard (this->internals_);
size_t request_id = delivery_requests_.size ();
if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"),
this->sequence_,
static_cast<int> (request_id),
static_cast<int> (this->complete_requests_),
static_cast<int> (this->delivery_requests_.size ())
));
Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
this->delivery_requests_.push_back (request);
TAO_Notify_Method_Request_Lookup_Queueable method (request, pc);
if (this->state_ == rssCREATING)
{
if (! reliable_channel)
{
enter_state_transient (guard);
}
else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0)
{
enter_state_transient (guard);
}
else if (! this->event_->reliable().is_valid())
{
enter_state_new (guard);
}
else if (this->event_->reliable().value() == true)
{
enter_state_new (guard);
}
else
{
enter_state_transient (guard);
}
}
else
{
// We only need to release the guard if the state is rssCREATING.
// By calling enter_state_*, we are guaranteed that the guard has
// been released.
guard.release ();
}
pc->execute_task (method);
}
| int TAO_Notify::Routing_Slip::sequence | ( | ) | const |
Provide an identifying number for this Routing Slip to use in debug messages.
Definition at line 923 of file Routing_Slip.cpp.
{
return this->sequence_;
}
| void TAO_Notify::Routing_Slip::set_rspm | ( | Routing_Slip_Persistence_Manager * | rspm | ) |
Definition at line 158 of file Routing_Slip.cpp.
| bool TAO_Notify::Routing_Slip::should_retry | ( | ) | const |
Should delivery of this event be retried if it fails?
Definition at line 929 of file Routing_Slip.cpp.
{
// simple minded test: if it's transient, don't retry it
// @@todo Eventually this should check timeout, discard policy, etc.
return this->state_ != rssTRANSIENT;
}
| bool TAO_Notify::Routing_Slip::unmarshal | ( | TAO_Notify_EventChannelFactory & | ecf, | |
| TAO_InputCDR & | rscdr | |||
| ) | [private] |
Marshal from CDR.
Definition at line 852 of file Routing_Slip.cpp.
{
CORBA::ULong count = 0;
cdr.read_ulong (count);
for (size_t nreq = 0; nreq < count; ++nreq)
{
ACE_CDR::Octet code = 0;
while (cdr.read_octet(code))
{
try
{
if (code == TAO_Notify_Method_Request_Dispatch::persistence_code)
{
Delivery_Request * prequest;
ACE_NEW_THROW_EX (
prequest,
Delivery_Request(this_ptr_, this->delivery_requests_.size ()),
CORBA::NO_MEMORY ());
Delivery_Request_Ptr request(prequest);
TAO_Notify_Method_Request_Dispatch_Queueable * method =
TAO_Notify_Method_Request_Dispatch::unmarshal (
request,
ecf,
cdr);
if (method != 0)
{
this->delivery_requests_.push_back (request);
this->delivery_methods_.push_back (method);
}
}
else if (code == TAO_Notify_Method_Request_Lookup::persistence_code)
{
Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ()));
TAO_Notify_Method_Request_Lookup_Queueable * method =
TAO_Notify_Method_Request_Lookup::unmarshal (
request,
ecf,
cdr);
if (method != 0)
{
this->delivery_requests_.push_back (request);
this->delivery_methods_.push_back (method);
}
}
}
catch (const CORBA::Exception&)
{
// @@todo should we log this?
// just ignore failures
}
}
}
return this->delivery_requests_.size () > 0;
}
| void TAO_Notify::Routing_Slip::wait_persist | ( | ) |
Wait until the event/routing_slip has been saved at least once.
Definition at line 219 of file Routing_Slip.cpp.
{
Routing_Slip_Guard guard (this->internals_);
while (!this->is_safe_)
{
this->until_safe_.wait ();
}
}
size_t TAO_Notify::Routing_Slip::complete_requests_ [private] |
How many delivery requests are complete.
Definition at line 207 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_continue_changed_ = 0 [static, private] |
Definition at line 228 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_continue_changed_while_saving_ = 0 [static, private] |
Definition at line 226 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_continue_new_ = 0 [static, private] |
Definition at line 220 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_continue_transient_ = 0 [static, private] |
Definition at line 217 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_changed_ = 0 [static, private] |
Definition at line 227 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_changed_while_saving_ = 0 [static, private] |
Definition at line 225 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_complete_ = 0 [static, private] |
Definition at line 229 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_complete_while_new_ = 0 [static, private] |
Definition at line 221 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_deleting_ = 0 [static, private] |
Definition at line 230 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_new_ = 0 [static, private] |
Definition at line 219 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_reloaded_ = 0 [static, private] |
Definition at line 218 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_saved_ = 0 [static, private] |
Definition at line 223 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_saving_ = 0 [static, private] |
Definition at line 222 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_terminal_ = 0 [static, private] |
Definition at line 231 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_transient_ = 0 [static, private] |
Definition at line 216 of file Routing_Slip.h.
size_t TAO_Notify::Routing_Slip::count_enter_updating_ = 0 [static, private] |
Definition at line 224 of file Routing_Slip.h.
Methods that should be restarted during event recovery.
Definition at line 204 of file Routing_Slip.h.
A collection of delivery requests.
Definition at line 201 of file Routing_Slip.h.
Definition at line 179 of file Routing_Slip.h.
TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::internals_ [private] |
Protection for internal information.
Definition at line 167 of file Routing_Slip.h.
bool TAO_Notify::Routing_Slip::is_safe_ [private] |
true when event persistence qos is guaranteed
Definition at line 169 of file Routing_Slip.h.
Routing_Slip_Queue TAO_Notify::Routing_Slip::persistent_queue_ [static, private] |
Definition at line 233 of file Routing_Slip.h.
int TAO_Notify::Routing_Slip::routing_slip_sequence_ = 0 [static, private] |
Definition at line 215 of file Routing_Slip.h.
Pointer to a Routing_Slip_Persistence_Manager.
Definition at line 210 of file Routing_Slip.h.
int TAO_Notify::Routing_Slip::sequence_ [private] |
Definition at line 212 of file Routing_Slip.h.
TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::sequence_lock_ [static, private] |
Definition at line 214 of file Routing_Slip.h.
enum TAO_Notify::Routing_Slip::State TAO_Notify::Routing_Slip::state_ [private] |
A mini-state machine to control persistence See external doc for circles and arrows.
Smart pointer to this object Provides continuity between smart pointers and "Routing_Slip::this" Also lets the Routing_Slip manage its own minimum lifetime.
Definition at line 176 of file Routing_Slip.h.
ACE_SYNCH_CONDITION TAO_Notify::Routing_Slip::until_safe_ [private] |
signalled when is_safe_ goes true
Definition at line 171 of file Routing_Slip.h.
1.7.0