TAO_Notify::Routing_Slip Class Reference

Class which manages the delivery of events to destination. More...

#include <Routing_Slip.h>

Inheritance diagram for TAO_Notify::Routing_Slip:

Inheritance graph
[legend]
Collaboration diagram for TAO_Notify::Routing_Slip:

Collaboration graph
[legend]
List of all members.

Public Member Functions

void set_rspm (Routing_Slip_Persistence_Manager *rspm)
void reconnect ()
virtual ~Routing_Slip ()
 destructor (should be private but that inspires compiler wars)

void route (TAO_Notify_ProxyConsumer *pc, bool reliable_channel)
void dispatch (TAO_Notify_ProxySupplier *proxy_supplier, bool filter)
 Schedule delivery to a consumer via a proxy supplier.

void wait_persist ()
 Wait until the event/routing_slip has been saved at least once.

void delivery_request_complete (size_t request_id)
 A delivery request has been satisfied.

void at_front_of_persist_queue ()
 This Routing_Slip reached the front of the persistence queue.

virtual void persist_complete ()
 The persistent storage has completed the last request.

const TAO_Notify_Event::Ptrevent () const
int sequence () const
 Provide an identifying number for this Routing Slip to use in debug messages.

bool should_retry () const
 Should delivery of this event be retried if it fails?


Static Public Member Functions

Routing_Slip_Ptr create (const TAO_Notify_Event::Ptr &event)
 "Factory" method for normal use.

Routing_Slip_Ptr create (TAO_Notify_EventChannelFactory &ecf, Routing_Slip_Persistence_Manager *rspm)
 "Factory" method for use during reload from persistent storage.


Private Types

typedef ACE_Guard< TAO_SYNCH_MUTEXRouting_Slip_Guard
enum  State {
  rssCREATING, rssTRANSIENT, rssRELOADED, rssNEW,
  rssCOMPLETE_WHILE_NEW, rssSAVING, rssSAVED, rssUPDATING,
  rssCHANGED_WHILE_SAVING, rssCHANGED, rssCOMPLETE, rssDELETING,
  rssTERMINAL
}

Private Member Functions

void enter_state_transient (Routing_Slip_Guard &guard)
void continue_state_transient (Routing_Slip_Guard &guard)
void enter_state_reloaded (Routing_Slip_Guard &guard)
void enter_state_new (Routing_Slip_Guard &guard)
void continue_state_new (Routing_Slip_Guard &guard)
void enter_state_complete_while_new (Routing_Slip_Guard &guard)
void enter_state_saving (Routing_Slip_Guard &guard)
void enter_state_saved (Routing_Slip_Guard &guard)
void enter_state_updating (Routing_Slip_Guard &guard)
void enter_state_changed_while_saving (Routing_Slip_Guard &guard)
void continue_state_changed_while_saving (Routing_Slip_Guard &guard)
void enter_state_changed (Routing_Slip_Guard &guard)
void continue_state_changed (Routing_Slip_Guard &guard)
void enter_state_complete (Routing_Slip_Guard &guard)
void enter_state_deleting (Routing_Slip_Guard &guard)
void enter_state_terminal (Routing_Slip_Guard &guard)
bool create_persistence_manager ()
 Routing_Slip (const TAO_Notify_Event::Ptr &event)
 Private constructor for use by create method.

bool all_deliveries_complete () const
 Test to see if all deliveries are complete.

void add_to_persist_queue (Routing_Slip_Guard &guard)
 This routing_slip needs to be saved.

void marshal (TAO_OutputCDR &cdr)
 Marshal into a CDR.

bool unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR &rscdr)
 Marshal from CDR.


Private Attributes

TAO_SYNCH_MUTEX internals_
 Protection for internal information.

bool is_safe_
 true when event persistence qos is guaranteed

ACE_SYNCH_CONDITION until_safe_
 signalled when is_safe_ goes true

Routing_Slip_Ptr this_ptr_
TAO_Notify_Event::Ptr event_
enum TAO_Notify::Routing_Slip::State state_
Delivery_Request_Vec delivery_requests_
 A collection of delivery requests.

Delivery_Method_Vec delivery_methods_
 Methods that should be restarted during event recovery.

size_t complete_requests_
 How many delivery requests are complete.

Routing_Slip_Persistence_Managerrspm_
 Pointer to a Routing_Slip_Persistence_Manager.

int sequence_

Static Private Attributes

TAO_SYNCH_MUTEX sequence_lock_
int routing_slip_sequence_ = 0
size_t count_enter_transient_ = 0
size_t count_continue_transient_ = 0
size_t count_enter_reloaded_ = 0
size_t count_enter_new_ = 0
size_t count_continue_new_ = 0
size_t count_enter_complete_while_new_ = 0
size_t count_enter_saving_ = 0
size_t count_enter_saved_ = 0
size_t count_enter_updating_ = 0
size_t count_enter_changed_while_saving_ = 0
size_t count_continue_changed_while_saving_ = 0
size_t count_enter_changed_ = 0
size_t count_continue_changed_ = 0
size_t count_enter_complete_ = 0
size_t count_enter_deleting_ = 0
size_t count_enter_terminal_ = 0
Routing_Slip_Queue persistent_queue_

Detailed Description

Class which manages the delivery of events to destination.

Interacts with persistent storage to provide reliable delivery.

Definition at line 66 of file Routing_Slip.h.


Member Typedef Documentation

typedef ACE_Guard< TAO_SYNCH_MUTEX > TAO_Notify::Routing_Slip::Routing_Slip_Guard [private]
 

Definition at line 68 of file Routing_Slip.h.

Referenced by add_to_persist_queue(), at_front_of_persist_queue(), continue_state_changed(), continue_state_changed_while_saving(), continue_state_new(), continue_state_transient(), delivery_request_complete(), dispatch(), enter_state_changed(), enter_state_changed_while_saving(), enter_state_complete(), enter_state_complete_while_new(), enter_state_deleting(), enter_state_new(), enter_state_reloaded(), enter_state_saved(), enter_state_saving(), enter_state_terminal(), enter_state_transient(), enter_state_updating(), persist_complete(), reconnect(), route(), Routing_Slip(), and wait_persist().


Member Enumeration Documentation

enum TAO_Notify::Routing_Slip::State [private]
 

A mini-state machine to control persistence See external doc for circles and arrows.

Enumeration values:
rssCREATING 
rssTRANSIENT 
rssRELOADED 
rssNEW 
rssCOMPLETE_WHILE_NEW 
rssSAVING 
rssSAVED 
rssUPDATING 
rssCHANGED_WHILE_SAVING 
rssCHANGED 
rssCOMPLETE 
rssDELETING 
rssTERMINAL 

Definition at line 184 of file Routing_Slip.h.

00185   {
00186     rssCREATING,
00187     rssTRANSIENT,
00188     rssRELOADED,
00189     rssNEW,
00190     rssCOMPLETE_WHILE_NEW,
00191     rssSAVING,
00192     rssSAVED,
00193     rssUPDATING,
00194     rssCHANGED_WHILE_SAVING,
00195     rssCHANGED,
00196     rssCOMPLETE,
00197     rssDELETING,
00198     rssTERMINAL
00199   } state_;


Constructor & Destructor Documentation

TAO_Notify::Routing_Slip::~Routing_Slip  )  [virtual]
 

destructor (should be private but that inspires compiler wars)

Definition at line 188 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, and LM_DEBUG.

00189 {
00190   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00191       ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"),
00192       this->sequence_
00193       ));
00194 }

TAO_Notify::Routing_Slip::Routing_Slip const TAO_Notify_Event::Ptr event  )  [private]
 

Private constructor for use by create method.

Definition at line 170 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, routing_slip_sequence_, sequence_, and sequence_lock_.

00172   : is_safe_ (false)
00173   , until_safe_ (internals_)
00174   , this_ptr_ (0)
00175   , event_(event)
00176   , state_ (rssCREATING)
00177   , complete_requests_ (0)
00178   , rspm_ (0)
00179 {
00180   Routing_Slip_Guard guard (sequence_lock_);
00181   this->sequence_ = ++routing_slip_sequence_;
00182   if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG,
00183       ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"),
00184       this->sequence_
00185       ));
00186 }


Member Function Documentation

void TAO_Notify::Routing_Slip::add_to_persist_queue Routing_Slip_Guard guard  )  [private]
 

This routing_slip needs to be saved.

Definition at line 578 of file Routing_Slip.cpp.

References ACE_Guard< ACE_LOCK >::acquire(), TAO_Notify::Routing_Slip_Queue::add(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), and Routing_Slip_Guard.

Referenced by enter_state_changed(), and enter_state_new().

00579 {
00580   guard.release ();
00581   this->persistent_queue_.add (this->this_ptr_);
00582   guard.acquire (); // necessary?
00583 }

bool TAO_Notify::Routing_Slip::all_deliveries_complete  )  const [private]
 

Test to see if all deliveries are complete.

Definition at line 572 of file Routing_Slip.cpp.

References complete_requests_, delivery_requests_, and ACE_Vector< T, DEFAULT_SIZE >::size().

Referenced by continue_state_changed(), continue_state_new(), continue_state_transient(), enter_state_changed(), and enter_state_transient().

00573 {
00574   return this->complete_requests_ == this->delivery_requests_.size ();
00575 }

void TAO_Notify::Routing_Slip::at_front_of_persist_queue  ) 
 

This Routing_Slip reached the front of the persistence queue.

Definition at line 451 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_deleting(), enter_state_saving(), enter_state_terminal(), enter_state_updating(), LM_DEBUG, LM_ERROR, persistent_queue_, Routing_Slip_Guard, rssCHANGED, rssCOMPLETE, rssCOMPLETE_WHILE_NEW, and rssNEW.

00452 {
00453   Routing_Slip_Guard guard (this->internals_);
00454   State state = this->state_;
00455   switch (state)
00456   {
00457     case rssNEW:
00458     {
00459       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00460         ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"),
00461         this->sequence_
00462         ));
00463       enter_state_saving (guard);
00464       break;
00465     }
00466     case rssCOMPLETE_WHILE_NEW:
00467     {
00468       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00469         ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"),
00470         this->sequence_
00471         ));
00472       this->persistent_queue_.complete ();
00473       enter_state_terminal (guard);
00474       break;
00475     }
00476     case rssCHANGED:
00477     {
00478       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00479         ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"),
00480         this->sequence_
00481         ));
00482       enter_state_updating (guard);
00483       break;
00484     }
00485     case rssCOMPLETE:
00486     {
00487       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00488         ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"),
00489         this->sequence_
00490         ));
00491       enter_state_deleting (guard);
00492       break;
00493     }
00494     default:
00495     {
00496       ACE_ERROR ((LM_ERROR,
00497         ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"),
00498         this->sequence_,
00499         static_cast<int> (this->state_)
00500         ));
00501       break;
00502     }
00503   }
00504 }

void TAO_Notify::Routing_Slip::continue_state_changed Routing_Slip_Guard guard  )  [private]
 

Definition at line 774 of file Routing_Slip.cpp.

References all_deliveries_complete(), count_continue_changed_, enter_state_complete(), and Routing_Slip_Guard.

Referenced by delivery_request_complete().

00775 {
00776   ++count_continue_changed_;
00777   if (all_deliveries_complete ())
00778   {
00779     enter_state_complete (guard);
00780   }
00781 }

void TAO_Notify::Routing_Slip::continue_state_changed_while_saving Routing_Slip_Guard guard  )  [private]
 

Definition at line 749 of file Routing_Slip.cpp.

References Routing_Slip_Guard.

Referenced by delivery_request_complete().

00750 {
00751   ACE_UNUSED_ARG (guard);
00752   // no action necessary
00753 }

void TAO_Notify::Routing_Slip::continue_state_new Routing_Slip_Guard guard  )  [private]
 

Definition at line 601 of file Routing_Slip.cpp.

References all_deliveries_complete(), count_continue_new_, enter_state_complete_while_new(), and Routing_Slip_Guard.

Referenced by delivery_request_complete().

00602 {
00603   ++count_continue_new_;
00604   if (all_deliveries_complete ())
00605   {
00606     this->enter_state_complete_while_new (guard);
00607   }
00608 }

void TAO_Notify::Routing_Slip::continue_state_transient Routing_Slip_Guard guard  )  [private]
 

Definition at line 661 of file Routing_Slip.cpp.

References all_deliveries_complete(), count_continue_transient_, enter_state_terminal(), and Routing_Slip_Guard.

Referenced by delivery_request_complete().

00662 {
00663   ++count_continue_transient_;
00664   if (all_deliveries_complete ())
00665   {
00666     enter_state_terminal (guard);
00667   }
00668 }

Routing_Slip_Ptr TAO_Notify::Routing_Slip::create TAO_Notify_EventChannelFactory ecf,
Routing_Slip_Persistence_Manager rspm
[static]
 

"Factory" method for use during reload from persistent storage.

Definition at line 108 of file Routing_Slip.cpp.

References ACE_CATCHANY, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_ERROR, ACE_TEXT(), ACE_TRY, ACE_TRY_CHECK, create(), event(), LM_ERROR, TAO_Notify::Routing_Slip_Persistence_Manager::reload(), ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), and TAO_Notify::Routing_Slip_Ptr.

00111 {
00112   Routing_Slip_Ptr result;
00113   ACE_Message_Block * event_mb = 0;
00114   ACE_Message_Block * rs_mb = 0;
00115   ACE_DECLARE_NEW_ENV;
00116   ACE_TRY
00117     {
00118       if (rspm->reload (event_mb, rs_mb))
00119       {
00120         TAO_InputCDR cdr_event (event_mb);
00121         TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event));
00122         if (event.isSet())
00123         {
00124           result = create (event ACE_ENV_ARG_PARAMETER);
00125           ACE_TRY_CHECK;
00126           TAO_InputCDR cdr_rs (rs_mb);
00127           if ( result->unmarshal (ecf, cdr_rs))
00128           {
00129             result->set_rspm (rspm);
00130           }
00131           else
00132           {
00133             ACE_ERROR ((LM_ERROR,
00134               ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n")
00135               ));
00136             result.reset ();
00137           }
00138         }
00139         else
00140         {
00141           ACE_ERROR ((LM_ERROR,
00142             ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n")
00143             ));
00144         }
00145       }
00146     }
00147   ACE_CATCHANY
00148     {
00149       ACE_ERROR ((LM_ERROR,
00150         ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n")
00151         ));
00152     }
00153   ACE_ENDTRY;
00154   delete event_mb;
00155   delete rs_mb;
00156 
00157   return result;
00158 }

Routing_Slip_Ptr TAO_Notify::Routing_Slip::create const TAO_Notify_Event::Ptr event  )  [static]
 

"Factory" method for normal use.

Definition at line 56 of file Routing_Slip.cpp.

References ACE_CHECK_RETURN, ACE_ERROR, ACE_NEW_THROW_EX, ACE_TEXT(), count_continue_changed_, count_continue_changed_while_saving_, count_continue_new_, count_continue_transient_, count_enter_changed_, count_enter_changed_while_saving_, count_enter_complete_, count_enter_complete_while_new_, count_enter_deleting_, count_enter_new_, count_enter_reloaded_, count_enter_saved_, count_enter_saving_, count_enter_terminal_, count_enter_transient_, count_enter_updating_, DEBUG_LEVEL, LM_ERROR, and TAO_Notify::Routing_Slip_Ptr.

Referenced by create(), TAO_Notify_EventChannelFactory::load_event_persistence(), and TAO_Notify_ProxyConsumer::push_i().

00057 {
00058   Routing_Slip * prs;
00059   ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ());
00060   ACE_CHECK_RETURN (Routing_Slip_Ptr());
00061   Routing_Slip_Ptr result(prs);
00062   result->this_ptr_ = result; // let the pointers touch so they use the same ref count
00063 
00064   // note we don't care about ultra-precise stats, so no guard for these
00065   if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0))
00066   {
00067     ACE_ERROR ((LM_ERROR,
00068       ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n")
00069       ACE_TEXT ("  enter_transient              \t%d\n")
00070       ACE_TEXT ("  continue_transient           \t%d\n")
00071       ACE_TEXT ("  enter_reloaded               \t%d\n")
00072       ACE_TEXT ("  enter_new                    \t%d\n")
00073       ACE_TEXT ("  continue_new                 \t%d\n")
00074       ACE_TEXT ("  enter_complete_while_new     \t%d\n")
00075       ACE_TEXT ("  enter_saving                 \t%d\n")
00076       ACE_TEXT ("  enter_saved                  \t%d\n")
00077       ACE_TEXT ("  enter_updating               \t%d\n")
00078       ACE_TEXT ("  enter_changed_while_saving   \t%d\n")
00079       ACE_TEXT ("  continue_changed_while_saving\t%d\n")
00080       ACE_TEXT ("  enter_changed                \t%d\n")
00081       ACE_TEXT ("  continue_changed             \t%d\n")
00082       ACE_TEXT ("  enter_complete               \t%d\n")
00083       ACE_TEXT ("  enter_deleting               \t%d\n")
00084       ACE_TEXT ("  enter_terminal               \t%d\n")
00085       , static_cast<int> (count_enter_transient_)
00086       , static_cast<int> (count_continue_transient_)
00087       , static_cast<int> (count_enter_reloaded_)
00088       , static_cast<int> (count_enter_new_)
00089       , static_cast<int> (count_continue_new_)
00090       , static_cast<int> (count_enter_complete_while_new_)
00091       , static_cast<int> (count_enter_saving_)
00092       , static_cast<int> (count_enter_saved_)
00093       , static_cast<int> (count_enter_updating_)
00094       , static_cast<int> (count_enter_changed_while_saving_)
00095       , static_cast<int> (count_continue_changed_while_saving_)
00096       , static_cast<int> (count_enter_changed_)
00097       , static_cast<int> (count_continue_changed_)
00098       , static_cast<int> (count_enter_complete_)
00099       , static_cast<int> (count_enter_deleting_)
00100       , static_cast<int> (count_enter_terminal_)
00101       ));
00102   }
00103   return result;
00104 }

bool TAO_Notify::Routing_Slip::create_persistence_manager  )  [private]
 

Definition at line 197 of file Routing_Slip.cpp.

References TAO_Notify::Event_Persistence_Factory::create_routing_slip_persistence_manager(), TAO_Notify::Event_Persistence_Strategy::get_factory(), rspm_, and set_rspm().

Referenced by enter_state_saving().

00198 {
00199   if (this->rspm_ == 0)
00200   {
00201     Event_Persistence_Strategy * strategy =
00202       ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
00203     if (strategy != 0)
00204     {
00205       Event_Persistence_Factory * factory = strategy->get_factory ();
00206       if (factory != 0)
00207       {
00208         set_rspm (factory->create_routing_slip_persistence_manager(this));
00209       }
00210     }
00211   }
00212   return this->rspm_ != 0;
00213 }

void TAO_Notify::Routing_Slip::delivery_request_complete size_t  request_id  ) 
 

A delivery request has been satisfied.

Definition at line 386 of file Routing_Slip.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), complete_requests_, continue_state_changed(), continue_state_changed_while_saving(), continue_state_new(), continue_state_transient(), DEBUG_LEVEL, delivery_requests_, enter_state_changed(), enter_state_changed_while_saving(), LM_DEBUG, LM_ERROR, Routing_Slip_Guard, rssCHANGED, rssCHANGED_WHILE_SAVING, rssNEW, rssSAVED, rssSAVING, rssTRANSIENT, rssUPDATING, and ACE_Vector< T, DEFAULT_SIZE >::size().

00387 {
00388   Routing_Slip_Guard guard (this->internals_);
00389   ACE_ASSERT (request_id < this->delivery_requests_.size ());
00390   // reset the pointer to allow the delivery_request to be deleted.
00391   this->delivery_requests_[request_id].reset ();
00392   this->complete_requests_ += 1;
00393 
00394   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00395       ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"),
00396       this->sequence_,
00397       static_cast<int> (request_id),
00398       static_cast<int> (this->complete_requests_),
00399       static_cast<int> (this->delivery_requests_.size ())
00400       ));
00401   State state = this->state_;
00402   switch (state)
00403   {
00404     case rssTRANSIENT:
00405     {
00406       continue_state_transient (guard);
00407       break;
00408     }
00409     case rssNEW:
00410     {
00411       continue_state_new (guard);
00412       break;
00413     }
00414     case rssSAVING:
00415     {
00416       enter_state_changed_while_saving (guard);
00417       break;
00418     }
00419     case rssUPDATING:
00420     {
00421       enter_state_changed_while_saving (guard);
00422       break;
00423     }
00424     case rssSAVED:
00425     {
00426       enter_state_changed (guard);
00427       break;
00428     }
00429     case rssCHANGED_WHILE_SAVING:
00430     {
00431       continue_state_changed_while_saving (guard);
00432       break;
00433     }
00434     case rssCHANGED:
00435     {
00436       continue_state_changed (guard);
00437       break;
00438     }
00439     default:
00440     {
00441       ACE_ERROR ((LM_ERROR,
00442         ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"),
00443         static_cast<int> (this->state_)
00444         ));
00445       break;
00446     }
00447   }
00448 }

void TAO_Notify::Routing_Slip::dispatch TAO_Notify_ProxySupplier proxy_supplier,
bool  filter
 

Schedule delivery to a consumer via a proxy supplier.

Parameters:
proxy_supplier the proxy supplier that will deliver the event
filter should consumer-based filtering be applied?

Definition at line 332 of file Routing_Slip.cpp.

References ACE_ASSERT, ACE_CHECK, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_TEXT(), DEBUG_LEVEL, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, TAO_Notify_Object::execute_task(), TAO_Notify_Object::has_shutdown(), TAO_Notify_Object::id(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().

00336 {
00337   // cannot be the first action
00338   ACE_ASSERT (this->state_ != rssCREATING);
00339 
00340   TAO_Notify_ProxySupplier::Ptr psgrd(ps);
00341   Routing_Slip_Guard guard (this->internals_);
00342 
00343   size_t request_id = delivery_requests_.size ();
00344 
00345   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00346       ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"),
00347       this->sequence_,
00348       static_cast<int> (request_id),
00349       filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
00350       static_cast<int> (this->complete_requests_),
00351       static_cast<int> (this->delivery_requests_.size ())
00352       ));
00353 
00354   Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00355   if (! ps->has_shutdown() )
00356     {
00357       this->delivery_requests_.push_back (request);
00358       TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
00359       guard.release ();
00360       if (DEBUG_LEVEL > 8)
00361         ACE_DEBUG ((LM_DEBUG,
00362                     "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
00363                     "proxy supplier %d\n",
00364                     this->sequence_,
00365                     static_cast<int> (request_id),
00366                     ps->id()));
00367       ps->execute_task (method ACE_ENV_ARG_PARAMETER);
00368       ACE_CHECK;
00369     }
00370   else
00371     {
00372       if (DEBUG_LEVEL > 5)
00373         ACE_DEBUG ((LM_DEBUG,
00374                     "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
00375                     "proxy supplier %d; already shut down\n",
00376                     this->sequence_,
00377                     static_cast<int> (request_id),
00378                     ps->id()));
00379     }
00380 }

void TAO_Notify::Routing_Slip::enter_state_changed Routing_Slip_Guard guard  )  [private]
 

Definition at line 756 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), add_to_persist_queue(), all_deliveries_complete(), count_enter_changed_, DEBUG_LEVEL, enter_state_complete(), LM_DEBUG, Routing_Slip_Guard, and rssCHANGED.

Referenced by delivery_request_complete(), and persist_complete().

00757 {
00758   ++count_enter_changed_;
00759   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00760       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"),
00761       this->sequence_
00762       ));
00763   // complete state change BEFORE initiating request to avoid
00764   // race condition if request finishes before state is stable.
00765   this->state_ = rssCHANGED;
00766   if (all_deliveries_complete ())
00767   {
00768     enter_state_complete (guard);
00769   }
00770   add_to_persist_queue (guard);
00771 }

void TAO_Notify::Routing_Slip::enter_state_changed_while_saving Routing_Slip_Guard guard  )  [private]
 

Definition at line 737 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), count_enter_changed_while_saving_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssCHANGED_WHILE_SAVING.

Referenced by delivery_request_complete().

00738 {
00739   ++count_enter_changed_while_saving_;
00740   ACE_UNUSED_ARG (guard);
00741   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00742       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"),
00743       this->sequence_
00744       ));
00745   this->state_ = rssCHANGED_WHILE_SAVING;
00746 }

void TAO_Notify::Routing_Slip::enter_state_complete Routing_Slip_Guard guard  )  [private]
 

Definition at line 784 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), count_enter_complete_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssCOMPLETE.

Referenced by continue_state_changed(), and enter_state_changed().

00785 {
00786   ++count_enter_complete_;
00787   ACE_UNUSED_ARG (guard);
00788   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00789       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"),
00790       this->sequence_
00791       ));
00792   this->state_ = rssCOMPLETE;
00793 }

void TAO_Notify::Routing_Slip::enter_state_complete_while_new Routing_Slip_Guard guard  )  [private]
 

Definition at line 610 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), count_enter_complete_while_new_, DEBUG_LEVEL, is_safe_, LM_DEBUG, Routing_Slip_Guard, rssCOMPLETE_WHILE_NEW, and until_safe_.

Referenced by continue_state_new().

00611 {
00612   ++count_enter_complete_while_new_;
00613   ACE_UNUSED_ARG (guard);
00614   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00615       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"),
00616       this->sequence_
00617       ));
00618   // allow the ConsumerProxy to return from the CORBA push call.
00619   if (! is_safe_)
00620   {
00621     is_safe_ = true;
00622     this->until_safe_.signal ();
00623   }
00624   this->state_ = rssCOMPLETE_WHILE_NEW;
00625 }

void TAO_Notify::Routing_Slip::enter_state_deleting Routing_Slip_Guard guard  )  [private]
 

Definition at line 796 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), count_enter_deleting_, DEBUG_LEVEL, LM_DEBUG, ACE_Guard< ACE_LOCK >::release(), TAO_Notify::Routing_Slip_Persistence_Manager::remove(), Routing_Slip_Guard, rspm_, and rssDELETING.

Referenced by at_front_of_persist_queue().

00797 {
00798   ++count_enter_deleting_;
00799   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00800       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"),
00801       this->sequence_
00802       ));
00803   this->state_ = rssDELETING;
00804   guard.release ();
00805   this->rspm_->remove ();
00806   guard.acquire (); // necessary?
00807 }

void TAO_Notify::Routing_Slip::enter_state_new Routing_Slip_Guard guard  )  [private]
 

Definition at line 589 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), add_to_persist_queue(), count_enter_new_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssNEW.

Referenced by route().

00590 {
00591   ++count_enter_new_;
00592   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00593       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"),
00594       this->sequence_
00595       ));
00596   this->state_ = rssNEW;
00597   add_to_persist_queue(guard);
00598 }

void TAO_Notify::Routing_Slip::enter_state_reloaded Routing_Slip_Guard guard  )  [private]
 

Definition at line 628 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), count_enter_reloaded_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssRELOADED.

00629 {
00630   ++count_enter_reloaded_;
00631   ACE_UNUSED_ARG (guard);
00632   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00633       ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"),
00634       this->sequence_
00635       ));
00636   this->state_ = rssRELOADED;
00637 }

void TAO_Notify::Routing_Slip::enter_state_saved Routing_Slip_Guard guard  )  [private]
 

Definition at line 704 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), count_enter_saved_, DEBUG_LEVEL, LM_DEBUG, Routing_Slip_Guard, and rssSAVED.

Referenced by persist_complete(), and reconnect().

00705 {
00706   ++count_enter_saved_;
00707   ACE_UNUSED_ARG (guard);
00708   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00709       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"),
00710       this->sequence_
00711       ));
00712   this->state_ = rssSAVED;
00713 }

void TAO_Notify::Routing_Slip::enter_state_saving Routing_Slip_Guard guard  )  [private]
 

Definition at line 670 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), ACE_OutputCDR::begin(), TAO_Notify::Routing_Slip_Queue::complete(), count_enter_saving_, create_persistence_manager(), DEBUG_LEVEL, enter_state_transient(), LM_DEBUG, marshal(), persistent_queue_, ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rspm_, rssSAVING, and TAO_Notify::Routing_Slip_Persistence_Manager::store().

Referenced by at_front_of_persist_queue().

00671 {
00672   ++count_enter_saving_;
00673   if (!create_persistence_manager ())
00674   {
00675     // Note This should actually be a throw (out of memory)
00676     // but we cheat and make this a transient event.
00677     this->persistent_queue_.complete ();
00678     enter_state_transient (guard);
00679   }
00680   else
00681   {
00682     if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00683         ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"),
00684         this->sequence_
00685         ));
00686     this->state_ = rssSAVING;
00687 
00688     TAO_OutputCDR event_cdr;
00689     this->event_->marshal (event_cdr);
00690 
00691     const ACE_Message_Block *event_mb = event_cdr.begin ();
00692     TAO_OutputCDR rs_cdr;
00693     marshal (rs_cdr);
00694     const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00695 
00696     guard.release ();
00697     this->rspm_->store (*event_mb, *rs_mb);
00698 
00699     guard.acquire (); // necessary?
00700   }
00701 }

void TAO_Notify::Routing_Slip::enter_state_terminal Routing_Slip_Guard guard  )  [private]
 

Definition at line 810 of file Routing_Slip.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), count_enter_terminal_, DEBUG_LEVEL, LM_DEBUG, ACE_Strong_Bound_Ptr< X, ACE_LOCK >::reset(), Routing_Slip_Guard, rssTERMINAL, and this_ptr_.

Referenced by at_front_of_persist_queue(), continue_state_transient(), enter_state_transient(), and persist_complete().

00811 {
00812   ++count_enter_terminal_;
00813   ACE_UNUSED_ARG (guard);
00814   ACE_ASSERT( this->is_safe_);
00815   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00816       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"),
00817       this->sequence_
00818       ));
00819   this->state_ = rssTERMINAL;
00820   this->this_ptr_.reset ();
00821 }

void TAO_Notify::Routing_Slip::enter_state_transient Routing_Slip_Guard guard  )  [private]
 

Definition at line 640 of file Routing_Slip.cpp.

References ACE_DEBUG, ACE_TEXT(), all_deliveries_complete(), count_enter_transient_, DEBUG_LEVEL, enter_state_terminal(), is_safe_, LM_DEBUG, Routing_Slip_Guard, rssTRANSIENT, and until_safe_.

Referenced by enter_state_saving(), and route().

00641 {
00642   ++count_enter_transient_;
00643   ACE_UNUSED_ARG (guard);
00644   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00645       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"),
00646       this->sequence_
00647       ));
00648   this->state_ = rssTRANSIENT;
00649   if (! is_safe_)
00650   {
00651     is_safe_ = true;
00652     this->until_safe_.signal ();
00653   }
00654   if (all_deliveries_complete ())
00655   {
00656     enter_state_terminal (guard);
00657   }
00658 }

void TAO_Notify::Routing_Slip::enter_state_updating Routing_Slip_Guard guard  )  [private]
 

Definition at line 716 of file Routing_Slip.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_TEXT(), ACE_Guard< ACE_LOCK >::acquire(), ACE_OutputCDR::begin(), count_enter_updating_, DEBUG_LEVEL, LM_DEBUG, marshal(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rspm_, rssUPDATING, and TAO_Notify::Routing_Slip_Persistence_Manager::update().

Referenced by at_front_of_persist_queue().

00717 {
00718   ++count_enter_updating_;
00719   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00720       ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"),
00721       this->sequence_
00722       ));
00723   this->state_ = rssUPDATING;
00724 
00725   TAO_OutputCDR rs_cdr;
00726   marshal (rs_cdr);
00727   const ACE_Message_Block *rs_mb = rs_cdr.begin ();
00728   guard.release ();
00729 
00730   ACE_ASSERT (this->rspm_ != 0);
00731   this->rspm_->update (*rs_mb);
00732   guard.acquire (); // necessary?
00733 }

const TAO_Notify_Event::Ptr & TAO_Notify::Routing_Slip::event  )  const
 

Definition at line 216 of file Routing_Slip.cpp.

Referenced by create().

00217 {
00218   return this->event_;
00219 }

void TAO_Notify::Routing_Slip::marshal TAO_OutputCDR cdr  )  [private]
 

Marshal into a CDR.

Definition at line 824 of file Routing_Slip.cpp.

References delivery_requests_, ACE_Array_Base< T >::get(), TAO_Notify::Delivery_Request::marshal(), ACE_Vector< T, DEFAULT_SIZE >::size(), and ACE_OutputCDR::write_ulong().

Referenced by enter_state_saving(), and enter_state_updating().

00825 {
00826   size_t request_count = this->delivery_requests_.size();
00827   cdr.write_ulong (request_count - this->complete_requests_);
00828   for (size_t nreq = 0; nreq < request_count; ++nreq)
00829   {
00830     Delivery_Request * request = this->delivery_requests_[nreq].get ();
00831     if (request != 0)
00832     {
00833       request->marshal (cdr);
00834     }
00835   }
00836 }

void TAO_Notify::Routing_Slip::persist_complete  )  [virtual]
 

The persistent storage has completed the last request.

Implements TAO_Notify::Persistent_Callback.

Definition at line 507 of file Routing_Slip.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_ERROR, ACE_TEXT(), TAO_Notify::Routing_Slip_Queue::complete(), DEBUG_LEVEL, enter_state_changed(), enter_state_saved(), enter_state_terminal(), is_safe_, LM_DEBUG, LM_ERROR, ACE_Guard< ACE_LOCK >::locked(), persistent_queue_, Routing_Slip_Guard, TAO_Notify::Routing_Slip_Ptr, rssCHANGED_WHILE_SAVING, rssDELETING, rssSAVING, rssUPDATING, and until_safe_.

00508 {
00509   // keep this object around til this method returns.
00510   Routing_Slip_Ptr me(this->this_ptr_);
00511   Routing_Slip_Guard guard (this->internals_);
00512   ACE_ASSERT (guard.locked ());
00513 
00514   // allow the ConsumerProxy to return from the CORBA push call.
00515   if (! is_safe_)
00516   {
00517     is_safe_ = true;
00518     this->until_safe_.signal ();
00519   }
00520 
00521   State state = this->state_;
00522   switch (state)
00523   {
00524     case rssSAVING:
00525     {
00526       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00527         ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"),
00528         this->sequence_
00529         ));
00530       enter_state_saved(guard);
00531       break;
00532     }
00533     case rssCHANGED_WHILE_SAVING:
00534     {
00535       enter_state_changed (guard);
00536       break;
00537     }
00538     case rssUPDATING:
00539     {
00540       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00541         ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"),
00542         this->sequence_
00543         ));
00544       enter_state_saved (guard);
00545       break;
00546     }
00547     case rssDELETING:
00548     {
00549       if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00550         ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"),
00551         this->sequence_
00552         ));
00553       enter_state_terminal (guard);
00554       break;
00555     }
00556     default:
00557     {
00558       ACE_ERROR ((LM_ERROR,
00559         ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"),
00560         static_cast<int> (this->state_)
00561         ));
00562       break;
00563     }
00564   }
00565   this->persistent_queue_.complete ();
00566 }

void TAO_Notify::Routing_Slip::reconnect  ) 
 

Definition at line 902 of file Routing_Slip.cpp.

References ACE_CHECK, ACE_ENV_SINGLE_ARG_PARAMETER, ACE_Vector< T, DEFAULT_SIZE >::clear(), delivery_methods_, enter_state_saved(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, and ACE_Vector< T, DEFAULT_SIZE >::size().

00903 {
00904   Routing_Slip_Guard guard (this->internals_);
00905   enter_state_saved (guard);
00906   guard.release ();
00907   //@@todo is there a worker_task available to do this?
00908   size_t count = this->delivery_methods_.size ();
00909   for (size_t nmethod = 0; nmethod < count; ++nmethod)
00910   {
00911     this->delivery_methods_[nmethod]->execute (ACE_ENV_SINGLE_ARG_PARAMETER);
00912     ACE_CHECK;
00913   }
00914   this->delivery_methods_.clear ();
00915 }

void TAO_Notify::Routing_Slip::route TAO_Notify_ProxyConsumer pc,
bool  reliable_channel
 

Route this event to destinations must be the Action request after the routing slip is created.

Definition at line 232 of file Routing_Slip.cpp.

References ACE_ASSERT, ACE_DEBUG, ACE_ENV_ARG_PARAMETER, ACE_TEXT(), DEBUG_LEVEL, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, enter_state_new(), enter_state_transient(), TAO_Notify_Object::execute_task(), LM_DEBUG, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_Guard< ACE_LOCK >::release(), Routing_Slip_Guard, rssCREATING, and ACE_Vector< T, DEFAULT_SIZE >::size().

00233 {
00234   ACE_ASSERT(pc != 0);
00235 
00236   TAO_Notify_ProxyConsumer::Ptr pcgrd(pc);
00237 
00238   Routing_Slip_Guard guard (this->internals_);
00239 
00240   size_t request_id = delivery_requests_.size ();
00241 
00242   if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
00243       ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"),
00244       this->sequence_,
00245       static_cast<int> (request_id),
00246       static_cast<int> (this->complete_requests_),
00247       static_cast<int> (this->delivery_requests_.size ())
00248       ));
00249 
00250   Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
00251   this->delivery_requests_.push_back (request);
00252   TAO_Notify_Method_Request_Lookup_Queueable method (request, pc);
00253 
00254   if (this->state_ == rssCREATING)
00255   {
00256     if (! reliable_channel)
00257     {
00258       enter_state_transient (guard);
00259     }
00260     else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0)
00261     {
00262       enter_state_transient (guard);
00263     }
00264     else if (! this->event_->reliable().is_valid())
00265     {
00266       enter_state_new (guard);
00267     }
00268     else if (this->event_->reliable().value() == CosNotification::Persistent)
00269     {
00270       enter_state_new (guard);
00271     }
00272     else
00273     {
00274       enter_state_transient (guard);
00275     }
00276   }
00277   guard.release ();
00278   pc->execute_task (method ACE_ENV_ARG_PARAMETER);
00279 }

int TAO_Notify::Routing_Slip::sequence  )  const
 

Provide an identifying number for this Routing Slip to use in debug messages.

Definition at line 918 of file Routing_Slip.cpp.

References sequence_.

00919 {
00920   return this->sequence_;
00921 }

void TAO_Notify::Routing_Slip::set_rspm Routing_Slip_Persistence_Manager rspm  ) 
 

Definition at line 161 of file Routing_Slip.cpp.

References rspm_, and TAO_Notify::Routing_Slip_Persistence_Manager::set_callback().

Referenced by create_persistence_manager().

00162 {
00163   this->rspm_ = rspm;
00164   if (rspm_ != 0)
00165   {
00166     rspm->set_callback (this);
00167   }
00168 }

bool TAO_Notify::Routing_Slip::should_retry  )  const
 

Should delivery of this event be retried if it fails?

Definition at line 924 of file Routing_Slip.cpp.

References rssTRANSIENT.

00925 {
00926   // simple minded test: if it's transient, don't retry it
00927   // @@todo Eventually this should check timeout, discard policy, etc.
00928   return this->state_ != rssTRANSIENT;
00929 }

bool TAO_Notify::Routing_Slip::unmarshal TAO_Notify_EventChannelFactory ecf,
TAO_InputCDR rscdr
[private]
 

Marshal from CDR.

Definition at line 839 of file Routing_Slip.cpp.

References ACE_CATCHANY, ACE_DECLARE_NEW_ENV, ACE_ENDTRY, ACE_ENV_ARG_PARAMETER, ACE_NEW_THROW_EX, ACE_TRY, ACE_TRY_CHECK, delivery_methods_, TAO_Notify::Delivery_Request_Ptr, delivery_requests_, ACE_Vector< T, DEFAULT_SIZE >::push_back(), ACE_InputCDR::read_octet(), ACE_InputCDR::read_ulong(), ACE_Vector< T, DEFAULT_SIZE >::size(), this_ptr_, TAO_Notify_Method_Request_Lookup::unmarshal(), and TAO_Notify_Method_Request_Dispatch::unmarshal().

00840 {
00841   CORBA::ULong count = 0;
00842   cdr.read_ulong (count);
00843   for (size_t nreq = 0; nreq < count; ++nreq)
00844   {
00845     ACE_CDR::Octet code = 0;
00846     while (cdr.read_octet(code))
00847     {
00848       ACE_DECLARE_NEW_ENV;
00849       ACE_TRY
00850       {
00851         if (code == TAO_Notify_Method_Request_Dispatch::persistence_code)
00852         {
00853           Delivery_Request * prequest;
00854           ACE_NEW_THROW_EX (
00855             prequest,
00856             Delivery_Request(this_ptr_, this->delivery_requests_.size ()),
00857             CORBA::NO_MEMORY ());
00858           ACE_TRY_CHECK;
00859           Delivery_Request_Ptr request(prequest);
00860           TAO_Notify_Method_Request_Dispatch_Queueable * method =
00861             TAO_Notify_Method_Request_Dispatch::unmarshal (
00862               request,
00863               ecf,
00864               cdr
00865               ACE_ENV_ARG_PARAMETER);
00866           ACE_TRY_CHECK;
00867           if (method != 0)
00868           {
00869             this->delivery_requests_.push_back (request);
00870             this->delivery_methods_.push_back (method);
00871           }
00872         }
00873         else if (code == TAO_Notify_Method_Request_Lookup::persistence_code)
00874         {
00875           Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ()));
00876           TAO_Notify_Method_Request_Lookup_Queueable * method =
00877               TAO_Notify_Method_Request_Lookup::unmarshal (
00878                 request,
00879                 ecf,
00880                 cdr
00881                 ACE_ENV_ARG_PARAMETER);
00882           ACE_TRY_CHECK
00883           if (method != 0)
00884           {
00885             this->delivery_requests_.push_back (request);
00886             this->delivery_methods_.push_back (method);
00887           }
00888         }
00889       }
00890       ACE_CATCHANY;
00891       {
00892         // @@todo should we log this?
00893         // just ignore failures
00894       }
00895       ACE_ENDTRY;
00896     }
00897   }
00898   return this->delivery_requests_.size () > 0;
00899 }

void TAO_Notify::Routing_Slip::wait_persist  ) 
 

Wait until the event/routing_slip has been saved at least once.

Definition at line 222 of file Routing_Slip.cpp.

References is_safe_, Routing_Slip_Guard, and until_safe_.

00223 {
00224   Routing_Slip_Guard guard (this->internals_);
00225   while (!this->is_safe_)
00226   {
00227     this->until_safe_.wait ();
00228   }
00229 }


Member Data Documentation

size_t TAO_Notify::Routing_Slip::complete_requests_ [private]
 

How many delivery requests are complete.

Definition at line 208 of file Routing_Slip.h.

Referenced by all_deliveries_complete(), and delivery_request_complete().

size_t TAO_Notify::Routing_Slip::count_continue_changed_ = 0 [static, private]
 

Definition at line 50 of file Routing_Slip.cpp.

Referenced by continue_state_changed(), and create().

size_t TAO_Notify::Routing_Slip::count_continue_changed_while_saving_ = 0 [static, private]
 

Definition at line 48 of file Routing_Slip.cpp.

Referenced by create().

size_t TAO_Notify::Routing_Slip::count_continue_new_ = 0 [static, private]
 

Definition at line 42 of file Routing_Slip.cpp.

Referenced by continue_state_new(), and create().

size_t TAO_Notify::Routing_Slip::count_continue_transient_ = 0 [static, private]
 

Definition at line 39 of file Routing_Slip.cpp.

Referenced by continue_state_transient(), and create().

size_t TAO_Notify::Routing_Slip::count_enter_changed_ = 0 [static, private]
 

Definition at line 49 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_changed().

size_t TAO_Notify::Routing_Slip::count_enter_changed_while_saving_ = 0 [static, private]
 

Definition at line 47 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_changed_while_saving().

size_t TAO_Notify::Routing_Slip::count_enter_complete_ = 0 [static, private]
 

Definition at line 51 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_complete().

size_t TAO_Notify::Routing_Slip::count_enter_complete_while_new_ = 0 [static, private]
 

Definition at line 43 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_complete_while_new().

size_t TAO_Notify::Routing_Slip::count_enter_deleting_ = 0 [static, private]
 

Definition at line 52 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_deleting().

size_t TAO_Notify::Routing_Slip::count_enter_new_ = 0 [static, private]
 

Definition at line 41 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_new().

size_t TAO_Notify::Routing_Slip::count_enter_reloaded_ = 0 [static, private]
 

Definition at line 40 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_reloaded().

size_t TAO_Notify::Routing_Slip::count_enter_saved_ = 0 [static, private]
 

Definition at line 45 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_saved().

size_t TAO_Notify::Routing_Slip::count_enter_saving_ = 0 [static, private]
 

Definition at line 44 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_saving().

size_t TAO_Notify::Routing_Slip::count_enter_terminal_ = 0 [static, private]
 

Definition at line 53 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_terminal().

size_t TAO_Notify::Routing_Slip::count_enter_transient_ = 0 [static, private]
 

Definition at line 38 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_transient().

size_t TAO_Notify::Routing_Slip::count_enter_updating_ = 0 [static, private]
 

Definition at line 46 of file Routing_Slip.cpp.

Referenced by create(), and enter_state_updating().

Delivery_Method_Vec TAO_Notify::Routing_Slip::delivery_methods_ [private]
 

Methods that should be restarted during event recovery.

Definition at line 205 of file Routing_Slip.h.

Referenced by reconnect(), and unmarshal().

Delivery_Request_Vec TAO_Notify::Routing_Slip::delivery_requests_ [private]
 

A collection of delivery requests.

Definition at line 202 of file Routing_Slip.h.

Referenced by all_deliveries_complete(), delivery_request_complete(), dispatch(), marshal(), route(), and unmarshal().

TAO_Notify_Event::Ptr TAO_Notify::Routing_Slip::event_ [private]
 

Definition at line 180 of file Routing_Slip.h.

TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::internals_ [private]
 

Protection for internal information.

Definition at line 168 of file Routing_Slip.h.

bool TAO_Notify::Routing_Slip::is_safe_ [private]
 

true when event persistence qos is guaranteed

Definition at line 170 of file Routing_Slip.h.

Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist().

Routing_Slip_Queue TAO_Notify::Routing_Slip::persistent_queue_ [static, private]
 

Referenced by add_to_persist_queue(), at_front_of_persist_queue(), enter_state_saving(), and persist_complete().

int TAO_Notify::Routing_Slip::routing_slip_sequence_ = 0 [static, private]
 

Definition at line 37 of file Routing_Slip.cpp.

Referenced by Routing_Slip().

Routing_Slip_Persistence_Manager* TAO_Notify::Routing_Slip::rspm_ [private]
 

Pointer to a Routing_Slip_Persistence_Manager.

Definition at line 211 of file Routing_Slip.h.

Referenced by create_persistence_manager(), enter_state_deleting(), enter_state_saving(), enter_state_updating(), and set_rspm().

int TAO_Notify::Routing_Slip::sequence_ [private]
 

Definition at line 213 of file Routing_Slip.h.

Referenced by Routing_Slip(), and sequence().

TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::sequence_lock_ [static, private]
 

Definition at line 36 of file Routing_Slip.cpp.

Referenced by Routing_Slip().

enum TAO_Notify::Routing_Slip::State TAO_Notify::Routing_Slip::state_ [private]
 

A mini-state machine to control persistence See external doc for circles and arrows.

Routing_Slip_Ptr TAO_Notify::Routing_Slip::this_ptr_ [private]
 

Smart pointer to this object Provides continuity between smart pointers and "Routing_Slip::this" Also lets the Routing_Slip manage its own minimum lifetime.

Definition at line 177 of file Routing_Slip.h.

Referenced by enter_state_terminal(), and unmarshal().

ACE_SYNCH_CONDITION TAO_Notify::Routing_Slip::until_safe_ [private]
 

signalled when is_safe_ goes true

Definition at line 172 of file Routing_Slip.h.

Referenced by enter_state_complete_while_new(), enter_state_transient(), persist_complete(), and wait_persist().


The documentation for this class was generated from the following files:
Generated on Thu Nov 9 13:34:21 2006 for TAO_CosNotification by doxygen 1.3.6