Public Member Functions | Static Public Member Functions | Private Types | Private Member Functions | Private Attributes | Static Private Attributes

TAO_Notify::Routing_Slip Class Reference

Class which manages the delivery of events to destination. More...

#include <Routing_Slip.h>

Inheritance diagram for TAO_Notify::Routing_Slip:
Inheritance graph
[legend]
Collaboration diagram for TAO_Notify::Routing_Slip:
Collaboration graph
[legend]

List of all members.

Public Member Functions

void set_rspm (Routing_Slip_Persistence_Manager *rspm)
void reconnect (void)
virtual ~Routing_Slip ()
 destructor (should be private but that inspires compiler wars)
void route (TAO_Notify_ProxyConsumer *pc, bool reliable_channel)
void dispatch (TAO_Notify_ProxySupplier *proxy_supplier, bool filter)
 Schedule delivery to a consumer via a proxy supplier.
void wait_persist ()
 Wait until the event/routing_slip has been saved at least once.
void delivery_request_complete (size_t request_id)
 A delivery request has been satisfied.
void at_front_of_persist_queue ()
 This Routing_Slip reached the front of the persistence queue.
virtual void persist_complete ()
 The persistent storage has completed the last request.
const TAO_Notify_Event::Ptrevent () const
int sequence () const
 Provide an identifying number for this Routing Slip to use in debug messages.
bool should_retry () const
 Should delivery of this event be retried if it fails?

Static Public Member Functions

static Routing_Slip_Ptr create (const TAO_Notify_Event::Ptr &event)
 "Factory" method for normal use.
static Routing_Slip_Ptr create (TAO_Notify_EventChannelFactory &ecf, Routing_Slip_Persistence_Manager *rspm)
 "Factory" method for use during reload from persistent storage.

Private Types

enum  State {
  rssCREATING, rssTRANSIENT, rssRELOADED, rssNEW,
  rssCOMPLETE_WHILE_NEW, rssSAVING, rssSAVED, rssUPDATING,
  rssCHANGED_WHILE_SAVING, rssCHANGED, rssCOMPLETE, rssDELETING,
  rssTERMINAL
}
typedef ACE_Guard
< TAO_SYNCH_MUTEX > 
Routing_Slip_Guard

Private Member Functions

void enter_state_transient (Routing_Slip_Guard &guard)
void continue_state_transient (Routing_Slip_Guard &guard)
void enter_state_reloaded (Routing_Slip_Guard &guard)
void enter_state_new (Routing_Slip_Guard &guard)
void continue_state_new (Routing_Slip_Guard &guard)
void enter_state_complete_while_new (Routing_Slip_Guard &guard)
void enter_state_saving (Routing_Slip_Guard &guard)
void enter_state_saved (Routing_Slip_Guard &guard)
void enter_state_updating (Routing_Slip_Guard &guard)
void enter_state_changed_while_saving (Routing_Slip_Guard &guard)
void continue_state_changed_while_saving (Routing_Slip_Guard &guard)
void enter_state_changed (Routing_Slip_Guard &guard)
void continue_state_changed (Routing_Slip_Guard &guard)
void enter_state_complete (Routing_Slip_Guard &guard)
void enter_state_deleting (Routing_Slip_Guard &guard)
void enter_state_terminal (Routing_Slip_Guard &guard)
bool create_persistence_manager ()
 Routing_Slip (const TAO_Notify_Event::Ptr &event)
 Private constructor for use by create method.
bool all_deliveries_complete () const
 Test to see if all deliveries are complete.
void add_to_persist_queue (Routing_Slip_Guard &guard)
 This routing_slip needs to be saved.
void marshal (TAO_OutputCDR &cdr)
 Marshal into a CDR.
bool unmarshal (TAO_Notify_EventChannelFactory &ecf, TAO_InputCDR &rscdr)
 Marshal from CDR.

Private Attributes

TAO_SYNCH_MUTEX internals_
 Protection for internal information.
bool is_safe_
 true when event persistence qos is guaranteed
ACE_SYNCH_CONDITION until_safe_
 signalled when is_safe_ goes true
Routing_Slip_Ptr this_ptr_
TAO_Notify_Event::Ptr event_
enum
TAO_Notify::Routing_Slip::State 
state_
Delivery_Request_Vec delivery_requests_
 A collection of delivery requests.
Delivery_Method_Vec delivery_methods_
 Methods that should be restarted during event recovery.
size_t complete_requests_
 How many delivery requests are complete.
Routing_Slip_Persistence_Managerrspm_
 Pointer to a Routing_Slip_Persistence_Manager.
int sequence_

Static Private Attributes

static TAO_SYNCH_MUTEX sequence_lock_
static int routing_slip_sequence_ = 0
static size_t count_enter_transient_ = 0
static size_t count_continue_transient_ = 0
static size_t count_enter_reloaded_ = 0
static size_t count_enter_new_ = 0
static size_t count_continue_new_ = 0
static size_t count_enter_complete_while_new_ = 0
static size_t count_enter_saving_ = 0
static size_t count_enter_saved_ = 0
static size_t count_enter_updating_ = 0
static size_t count_enter_changed_while_saving_ = 0
static size_t count_continue_changed_while_saving_ = 0
static size_t count_enter_changed_ = 0
static size_t count_continue_changed_ = 0
static size_t count_enter_complete_ = 0
static size_t count_enter_deleting_ = 0
static size_t count_enter_terminal_ = 0
static Routing_Slip_Queue persistent_queue_

Detailed Description

Class which manages the delivery of events to destination.

Interacts with persistent storage to provide reliable delivery.

Definition at line 66 of file Routing_Slip.h.


Member Typedef Documentation

typedef ACE_Guard< TAO_SYNCH_MUTEX > TAO_Notify::Routing_Slip::Routing_Slip_Guard [private]

Definition at line 68 of file Routing_Slip.h.


Member Enumeration Documentation

A mini-state machine to control persistence See external doc for circles and arrows.

Enumerator:
rssCREATING 
rssTRANSIENT 
rssRELOADED 
rssNEW 
rssCOMPLETE_WHILE_NEW 
rssSAVING 
rssSAVED 
rssUPDATING 
rssCHANGED_WHILE_SAVING 
rssCHANGED 
rssCOMPLETE 
rssDELETING 
rssTERMINAL 

Definition at line 183 of file Routing_Slip.h.

  {
    rssCREATING,
    rssTRANSIENT,
    rssRELOADED,
    rssNEW,
    rssCOMPLETE_WHILE_NEW,
    rssSAVING,
    rssSAVED,
    rssUPDATING,
    rssCHANGED_WHILE_SAVING,
    rssCHANGED,
    rssCOMPLETE,
    rssDELETING,
    rssTERMINAL
  } state_;


Constructor & Destructor Documentation

TAO_Notify::Routing_Slip::~Routing_Slip (  )  [virtual]

destructor (should be private but that inspires compiler wars)

Definition at line 185 of file Routing_Slip.cpp.

{
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: destructor\n"),
      this->sequence_
      ));
}

TAO_Notify::Routing_Slip::Routing_Slip ( const TAO_Notify_Event::Ptr event  )  [private]

Private constructor for use by create method.

Definition at line 167 of file Routing_Slip.cpp.

  : is_safe_ (false)
  , until_safe_ (internals_)
  , this_ptr_ (0)
  , event_(event)
  , state_ (rssCREATING)
  , complete_requests_ (0)
  , rspm_ (0)
{
  Routing_Slip_Guard guard (sequence_lock_);
  this->sequence_ = ++routing_slip_sequence_;
  if (DEBUG_LEVEL > 1) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: constructor\n"),
      this->sequence_
      ));
}


Member Function Documentation

void TAO_Notify::Routing_Slip::add_to_persist_queue ( Routing_Slip_Guard guard  )  [private]

This routing_slip needs to be saved.

Definition at line 582 of file Routing_Slip.cpp.

{
  guard.release ();
  this->persistent_queue_.add (this->this_ptr_);
}

bool TAO_Notify::Routing_Slip::all_deliveries_complete (  )  const [private]

Test to see if all deliveries are complete.

Definition at line 576 of file Routing_Slip.cpp.

{
  return this->complete_requests_ == this->delivery_requests_.size ();
}

void TAO_Notify::Routing_Slip::at_front_of_persist_queue (  ) 

This Routing_Slip reached the front of the persistence queue.

Definition at line 453 of file Routing_Slip.cpp.

{
  Routing_Slip_Guard guard (this->internals_);
  State state = this->state_;
  switch (state)
  {
    case rssNEW:
    {
      if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
        ACE_TEXT ("(%P|%t) Routing Slip #%d: NEW Reached front of queue\n"),
        this->sequence_
        ));
      enter_state_saving (guard);
      break;
    }
    case rssCOMPLETE_WHILE_NEW:
    {
      if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
        ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE_WHILE_NEW Reached front of queue\n"),
        this->sequence_
        ));
      guard.release ();
      this->persistent_queue_.complete ();
      enter_state_terminal (guard);
      break;
    }
    case rssCHANGED:
    {
      if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
        ACE_TEXT ("(%P|%t) Routing Slip #%d: CHANGED Reached front of queue\n"),
        this->sequence_
        ));
      enter_state_updating (guard);
      break;
    }
    case rssCOMPLETE:
    {
      if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
        ACE_TEXT ("(%P|%t) Routing Slip #%d: COMPLETE Reached front of queue\n"),
        this->sequence_
        ));
      enter_state_deleting (guard);
      break;
    }
    default:
    {
      ACE_ERROR ((LM_ERROR,
        ACE_TEXT ("(%P|%t) Routing Slip %d: Unexpected at_front_of_persist_queue in state %d\n"),
        this->sequence_,
        static_cast<int> (this->state_)
        ));
      break;
    }
  }
}

void TAO_Notify::Routing_Slip::continue_state_changed ( Routing_Slip_Guard guard  )  [private]

Definition at line 783 of file Routing_Slip.cpp.

{
  ++count_continue_changed_;
  if (all_deliveries_complete ())
  {
    enter_state_complete (guard);
  }
  else
  {
    guard.release ();
  }
}

void TAO_Notify::Routing_Slip::continue_state_changed_while_saving ( Routing_Slip_Guard guard  )  [private]

Definition at line 758 of file Routing_Slip.cpp.

{
  // no action necessary
  guard.release ();
}

void TAO_Notify::Routing_Slip::continue_state_new ( Routing_Slip_Guard guard  )  [private]

Definition at line 604 of file Routing_Slip.cpp.

{
  ++count_continue_new_;
  if (all_deliveries_complete ())
  {
    this->enter_state_complete_while_new (guard);
  }
  guard.release ();
}

void TAO_Notify::Routing_Slip::continue_state_transient ( Routing_Slip_Guard guard  )  [private]

Definition at line 668 of file Routing_Slip.cpp.

{
  ++count_continue_transient_;
  if (all_deliveries_complete ())
  {
    enter_state_terminal (guard);
  }
  else
  {
    guard.release ();
  }
}

Routing_Slip_Ptr TAO_Notify::Routing_Slip::create ( const TAO_Notify_Event::Ptr event  )  [static]

"Factory" method for normal use.

Definition at line 57 of file Routing_Slip.cpp.

{
  Routing_Slip * prs;
  ACE_NEW_THROW_EX (prs, Routing_Slip (event), CORBA::NO_MEMORY ());
  Routing_Slip_Ptr result(prs);
  result->this_ptr_ = result; // let the pointers touch so they use the same ref count

  // note we don't care about ultra-precise stats, so no guard for these
  if (DEBUG_LEVEL > 8 && ((result->sequence_ % 100) == 0))
  {
    ACE_ERROR ((LM_ERROR,
      ACE_TEXT ("(%P|%t) Routing_Slip_Statistics\n")
      ACE_TEXT ("  enter_transient              \t%d\n")
      ACE_TEXT ("  continue_transient           \t%d\n")
      ACE_TEXT ("  enter_reloaded               \t%d\n")
      ACE_TEXT ("  enter_new                    \t%d\n")
      ACE_TEXT ("  continue_new                 \t%d\n")
      ACE_TEXT ("  enter_complete_while_new     \t%d\n")
      ACE_TEXT ("  enter_saving                 \t%d\n")
      ACE_TEXT ("  enter_saved                  \t%d\n")
      ACE_TEXT ("  enter_updating               \t%d\n")
      ACE_TEXT ("  enter_changed_while_saving   \t%d\n")
      ACE_TEXT ("  continue_changed_while_saving\t%d\n")
      ACE_TEXT ("  enter_changed                \t%d\n")
      ACE_TEXT ("  continue_changed             \t%d\n")
      ACE_TEXT ("  enter_complete               \t%d\n")
      ACE_TEXT ("  enter_deleting               \t%d\n")
      ACE_TEXT ("  enter_terminal               \t%d\n")
      , static_cast<int> (count_enter_transient_)
      , static_cast<int> (count_continue_transient_)
      , static_cast<int> (count_enter_reloaded_)
      , static_cast<int> (count_enter_new_)
      , static_cast<int> (count_continue_new_)
      , static_cast<int> (count_enter_complete_while_new_)
      , static_cast<int> (count_enter_saving_)
      , static_cast<int> (count_enter_saved_)
      , static_cast<int> (count_enter_updating_)
      , static_cast<int> (count_enter_changed_while_saving_)
      , static_cast<int> (count_continue_changed_while_saving_)
      , static_cast<int> (count_enter_changed_)
      , static_cast<int> (count_continue_changed_)
      , static_cast<int> (count_enter_complete_)
      , static_cast<int> (count_enter_deleting_)
      , static_cast<int> (count_enter_terminal_)
      ));
  }
  return result;
}

Routing_Slip_Ptr TAO_Notify::Routing_Slip::create ( TAO_Notify_EventChannelFactory ecf,
Routing_Slip_Persistence_Manager rspm 
) [static]

"Factory" method for use during reload from persistent storage.

Definition at line 108 of file Routing_Slip.cpp.

{
  Routing_Slip_Ptr result;
  ACE_Message_Block * event_mb = 0;
  ACE_Message_Block * rs_mb = 0;
  try
    {
      if (rspm->reload (event_mb, rs_mb))
      {
        TAO_InputCDR cdr_event (event_mb);
        TAO_Notify_Event::Ptr event (TAO_Notify_Event::unmarshal (cdr_event));
        if (event.isSet())
        {
          result = create (event);
          TAO_InputCDR cdr_rs (rs_mb);
          if ( result->unmarshal (ecf, cdr_rs))
          {
            result->set_rspm (rspm);
          }
          else
          {
            ACE_ERROR ((LM_ERROR,
              ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for routing slip.\n")
              ));
            result.reset ();
          }
        }
        else
        {
          ACE_ERROR ((LM_ERROR,
            ACE_TEXT ("(%P|%t) Routing_Slip::create: Unmarshalling failed for event.\n")
            ));
        }
      }
    }
  catch (const CORBA::Exception&)
    {
      ACE_ERROR ((LM_ERROR,
        ACE_TEXT ("(%P|%t) Routing_Slip::create: Exception reloading event.\n")
        ));
    }
  delete event_mb;
  delete rs_mb;

  return result;
}

bool TAO_Notify::Routing_Slip::create_persistence_manager (  )  [private]

Definition at line 194 of file Routing_Slip.cpp.

{
  if (this->rspm_ == 0)
  {
    Event_Persistence_Strategy * strategy =
      ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence");
    if (strategy != 0)
    {
      Event_Persistence_Factory * factory = strategy->get_factory ();
      if (factory != 0)
      {
        set_rspm (factory->create_routing_slip_persistence_manager(this));
      }
    }
  }
  return this->rspm_ != 0;
}

void TAO_Notify::Routing_Slip::delivery_request_complete ( size_t  request_id  ) 

A delivery request has been satisfied.

Definition at line 388 of file Routing_Slip.cpp.

{
  Routing_Slip_Guard guard (this->internals_);
  ACE_ASSERT (request_id < this->delivery_requests_.size ());
  // reset the pointer to allow the delivery_request to be deleted.
  this->delivery_requests_[request_id].reset ();
  this->complete_requests_ += 1;

  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: delivery_request_complete #%d: completed %d of %d\n"),
      this->sequence_,
      static_cast<int> (request_id),
      static_cast<int> (this->complete_requests_),
      static_cast<int> (this->delivery_requests_.size ())
      ));
  State state = this->state_;
  switch (state)
  {
    case rssTRANSIENT:
    {
      continue_state_transient (guard);
      break;
    }
    case rssNEW:
    {
      continue_state_new (guard);
      break;
    }
    case rssSAVING:
    {
      enter_state_changed_while_saving (guard);
      break;
    }
    case rssUPDATING:
    {
      enter_state_changed_while_saving (guard);
      break;
    }
    case rssSAVED:
    {
      enter_state_changed (guard);
      break;
    }
    case rssCHANGED_WHILE_SAVING:
    {
      continue_state_changed_while_saving (guard);
      break;
    }
    case rssCHANGED:
    {
      continue_state_changed (guard);
      break;
    }
    default:
    {
      ACE_ERROR ((LM_ERROR,
        ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected delivery_request_complete in state %d\n"),
        static_cast<int> (this->state_)
        ));
      break;
    }
  }
}

void TAO_Notify::Routing_Slip::dispatch ( TAO_Notify_ProxySupplier proxy_supplier,
bool  filter 
)

Schedule delivery to a consumer via a proxy supplier.

Parameters:
proxy_supplier the proxy supplier that will deliver the event
filter should consumer-based filtering be applied?

Definition at line 336 of file Routing_Slip.cpp.

{
  // cannot be the first action
  ACE_ASSERT (this->state_ != rssCREATING);

  TAO_Notify_ProxySupplier::Ptr psgrd(ps);
  Routing_Slip_Guard guard (this->internals_);

  size_t request_id = delivery_requests_.size ();

  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: Dispatch %s; completed %d of %d\n"),
      this->sequence_,
      static_cast<int> (request_id),
      filter ? ACE_TEXT ("Filter") : ACE_TEXT ("No Filter"),
      static_cast<int> (this->complete_requests_),
      static_cast<int> (this->delivery_requests_.size ())
      ));

  Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
  if (! ps->has_shutdown() )
    {
      this->delivery_requests_.push_back (request);
      TAO_Notify_Method_Request_Dispatch_No_Copy method (request, ps, filter);
      guard.release ();
      if (DEBUG_LEVEL > 8)
        ACE_DEBUG ((LM_DEBUG,
                    "(%P|%t) Routing Slip #%d: dispatching Delivery_Request %d to "
                    "proxy supplier %d\n",
                    this->sequence_,
                    static_cast<int> (request_id),
                    ps->id()));
      ps->execute_task (method);
    }
  else
    {
      if (DEBUG_LEVEL > 5)
        ACE_DEBUG ((LM_DEBUG,
                    "(%P|%t) Routing Slip #%d: not dispatching Delivery_Request %d to "
                    "proxy supplier %d; already shut down\n",
                    this->sequence_,
                    static_cast<int> (request_id),
                    ps->id()));
    }
}

void TAO_Notify::Routing_Slip::enter_state_changed ( Routing_Slip_Guard guard  )  [private]

Definition at line 765 of file Routing_Slip.cpp.

{
  ++count_enter_changed_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED\n"),
      this->sequence_
      ));
  // complete state change BEFORE initiating request to avoid
  // race condition if request finishes before state is stable.
  this->state_ = rssCHANGED;
  if (all_deliveries_complete ())
  {
    enter_state_complete (guard);
  }
  add_to_persist_queue (guard);
}

void TAO_Notify::Routing_Slip::enter_state_changed_while_saving ( Routing_Slip_Guard guard  )  [private]

Definition at line 746 of file Routing_Slip.cpp.

{
  ++count_enter_changed_while_saving_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state CHANGED_WHILE_SAVING\n"),
      this->sequence_
      ));
  this->state_ = rssCHANGED_WHILE_SAVING;
  guard.release ();
}

void TAO_Notify::Routing_Slip::enter_state_complete ( Routing_Slip_Guard guard  )  [private]

Definition at line 797 of file Routing_Slip.cpp.

{
  ++count_enter_complete_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE\n"),
      this->sequence_
      ));
  this->state_ = rssCOMPLETE;
  guard.release ();
}

void TAO_Notify::Routing_Slip::enter_state_complete_while_new ( Routing_Slip_Guard guard  )  [private]

Definition at line 614 of file Routing_Slip.cpp.

{
  ++count_enter_complete_while_new_;
  ACE_UNUSED_ARG (guard);
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state COMPLETE_WHILE_NEW\n"),
      this->sequence_
      ));
  // allow the ConsumerProxy to return from the CORBA push call.
  if (! is_safe_)
  {
    is_safe_ = true;
    this->until_safe_.signal ();
  }
  this->state_ = rssCOMPLETE_WHILE_NEW;
}

void TAO_Notify::Routing_Slip::enter_state_deleting ( Routing_Slip_Guard guard  )  [private]

Definition at line 809 of file Routing_Slip.cpp.

{
  ++count_enter_deleting_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state DELETING\n"),
      this->sequence_
      ));
  this->state_ = rssDELETING;
  guard.release ();
  this->rspm_->remove ();
}

void TAO_Notify::Routing_Slip::enter_state_new ( Routing_Slip_Guard guard  )  [private]

Definition at line 592 of file Routing_Slip.cpp.

{
  ++count_enter_new_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state NEW\n"),
      this->sequence_
      ));
  this->state_ = rssNEW;
  add_to_persist_queue(guard);
}

void TAO_Notify::Routing_Slip::enter_state_reloaded ( Routing_Slip_Guard guard  )  [private]

Definition at line 632 of file Routing_Slip.cpp.

{
  ++count_enter_reloaded_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #&d: enter state RELOADED\n"),
      this->sequence_
      ));
  this->state_ = rssRELOADED;
  guard.release();
}

void TAO_Notify::Routing_Slip::enter_state_saved ( Routing_Slip_Guard guard  )  [private]

Definition at line 714 of file Routing_Slip.cpp.

{
  ++count_enter_saved_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVED\n"),
      this->sequence_
      ));
  this->state_ = rssSAVED;
  guard.release ();
}

void TAO_Notify::Routing_Slip::enter_state_saving ( Routing_Slip_Guard guard  )  [private]

Definition at line 681 of file Routing_Slip.cpp.

{
  ++count_enter_saving_;
  if (!create_persistence_manager ())
  {
    // Note This should actually be a throw (out of memory)
    // but we cheat and make this a transient event.
    guard.release ();
    this->persistent_queue_.complete ();
    enter_state_transient (guard);
  }
  else
  {
    if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
        ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state SAVING\n"),
        this->sequence_
        ));
    this->state_ = rssSAVING;

    TAO_OutputCDR event_cdr;
    this->event_->marshal (event_cdr);

    const ACE_Message_Block *event_mb = event_cdr.begin ();
    TAO_OutputCDR rs_cdr;
    marshal (rs_cdr);
    const ACE_Message_Block *rs_mb = rs_cdr.begin ();

    guard.release ();
    this->rspm_->store (*event_mb, *rs_mb);
  }
}

void TAO_Notify::Routing_Slip::enter_state_terminal ( Routing_Slip_Guard guard  )  [private]

Definition at line 822 of file Routing_Slip.cpp.

{
  ++count_enter_terminal_;
  ACE_ASSERT( this->is_safe_);
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TERMINAL\n"),
      this->sequence_
      ));
  this->state_ = rssTERMINAL;
  this->this_ptr_.reset ();
  guard.release ();
}

void TAO_Notify::Routing_Slip::enter_state_transient ( Routing_Slip_Guard guard  )  [private]

Definition at line 644 of file Routing_Slip.cpp.

{
  ++count_enter_transient_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state TRANSIENT\n"),
      this->sequence_
      ));
  this->state_ = rssTRANSIENT;
  if (! is_safe_)
  {
    is_safe_ = true;
    this->until_safe_.signal ();
  }
  if (all_deliveries_complete ())
  {
    enter_state_terminal (guard);
  }
  else
  {
    guard.release ();
  }
}

void TAO_Notify::Routing_Slip::enter_state_updating ( Routing_Slip_Guard guard  )  [private]

Definition at line 726 of file Routing_Slip.cpp.

{
  ++count_enter_updating_;
  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: enter state UPDATING\n"),
      this->sequence_
      ));
  this->state_ = rssUPDATING;

  TAO_OutputCDR rs_cdr;
  marshal (rs_cdr);
  const ACE_Message_Block *rs_mb = rs_cdr.begin ();
  guard.release ();

  ACE_ASSERT (this->rspm_ != 0);
  this->rspm_->update (*rs_mb);
}

const TAO_Notify_Event::Ptr & TAO_Notify::Routing_Slip::event (  )  const

Definition at line 213 of file Routing_Slip.cpp.

{
  return this->event_;
}

void TAO_Notify::Routing_Slip::marshal ( TAO_OutputCDR cdr  )  [private]

Marshal into a CDR.

Definition at line 836 of file Routing_Slip.cpp.

{
  size_t request_count = this->delivery_requests_.size();
  cdr.write_ulong (
    ACE_Utils::truncate_cast<CORBA::ULong> (request_count - this->complete_requests_));
  for (size_t nreq = 0; nreq < request_count; ++nreq)
  {
    Delivery_Request * request = this->delivery_requests_[nreq].get ();
    if (request != 0)
    {
      request->marshal (cdr);
    }
  }
}

void TAO_Notify::Routing_Slip::persist_complete (  )  [virtual]

The persistent storage has completed the last request.

Implements TAO_Notify::Persistent_Callback.

Definition at line 510 of file Routing_Slip.cpp.

{
  // keep this object around til this method returns.
  Routing_Slip_Ptr me(this->this_ptr_);
  Routing_Slip_Guard guard (this->internals_);
  ACE_ASSERT (guard.locked ());

  // allow the ConsumerProxy to return from the CORBA push call.
  if (! is_safe_)
  {
    is_safe_ = true;
    this->until_safe_.signal ();
  }

  State state = this->state_;
  switch (state)
  {
    case rssSAVING:
    {
      if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
        ACE_TEXT ("(%P|%t) Routing Slip #%d: SAVING persist complete\n"),
        this->sequence_
        ));
      enter_state_saved(guard);
      break;
    }
    case rssCHANGED_WHILE_SAVING:
    {
      enter_state_changed (guard);
      break;
    }
    case rssUPDATING:
    {
      if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
        ACE_TEXT ("(%P|%t) Routing Slip #%d: UPDATING persist complete\n"),
        this->sequence_
        ));
      enter_state_saved (guard);
      break;
    }
    case rssDELETING:
    {
      if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
        ACE_TEXT ("(%P|%t) Routing Slip #%d: DELETING persist complete\n"),
        this->sequence_
        ));
      enter_state_terminal (guard);
      break;
    }
    default:
    {
      ACE_ERROR ((LM_ERROR,
        ACE_TEXT ("(%P|%t) Notification Service Routing Slip: Unexpected transition in state %d\n"),
        static_cast<int> (this->state_)
        ));
      guard.release ();
      break;
    }
  }
  this->persistent_queue_.complete ();
}

void TAO_Notify::Routing_Slip::reconnect ( void   ) 

Definition at line 908 of file Routing_Slip.cpp.

{
  Routing_Slip_Guard guard (this->internals_);
  enter_state_saved (guard);

  //@@todo is there a worker_task available to do this?
  size_t count = this->delivery_methods_.size ();
  for (size_t nmethod = 0; nmethod < count; ++nmethod)
  {
    this->delivery_methods_[nmethod]->execute ();
  }
  this->delivery_methods_.clear ();
}

void TAO_Notify::Routing_Slip::route ( TAO_Notify_ProxyConsumer pc,
bool  reliable_channel 
)

Route this event to destinations must be the Action request after the routing slip is created.

Definition at line 229 of file Routing_Slip.cpp.

{
  ACE_ASSERT(pc != 0);

  TAO_Notify_ProxyConsumer::Ptr pcgrd(pc);

  Routing_Slip_Guard guard (this->internals_);

  size_t request_id = delivery_requests_.size ();

  if (DEBUG_LEVEL > 8) ACE_DEBUG ((LM_DEBUG,
      ACE_TEXT ("(%P|%t) Routing Slip #%d: add Delivery_Request #%d: lookup, completed %d of %d\n"),
      this->sequence_,
      static_cast<int> (request_id),
      static_cast<int> (this->complete_requests_),
      static_cast<int> (this->delivery_requests_.size ())
      ));

  Delivery_Request_Ptr request (new Delivery_Request (this->this_ptr_, request_id));
  this->delivery_requests_.push_back (request);
  TAO_Notify_Method_Request_Lookup_Queueable method (request, pc);

  if (this->state_ == rssCREATING)
  {
    if (! reliable_channel)
    {
      enter_state_transient (guard);
    }
    else if (ACE_Dynamic_Service <TAO_Notify::Event_Persistence_Strategy>::instance ("Event_Persistence") == 0)
    {
      enter_state_transient (guard);
    }
    else if (! this->event_->reliable().is_valid())
    {
      enter_state_new (guard);
    }
    else if (this->event_->reliable().value() == true)
    {
      enter_state_new (guard);
    }
    else
    {
      enter_state_transient (guard);
    }
  }
  else
  {
    // We only need to release the guard if the state is rssCREATING.
    // By calling enter_state_*, we are guaranteed that the guard has
    // been released.
    guard.release ();
  }
  pc->execute_task (method);
}

int TAO_Notify::Routing_Slip::sequence (  )  const

Provide an identifying number for this Routing Slip to use in debug messages.

Definition at line 923 of file Routing_Slip.cpp.

{
  return this->sequence_;
}

void TAO_Notify::Routing_Slip::set_rspm ( Routing_Slip_Persistence_Manager rspm  ) 

Definition at line 158 of file Routing_Slip.cpp.

{
  this->rspm_ = rspm;
  if (rspm_ != 0)
  {
    rspm->set_callback (this);
  }
}

bool TAO_Notify::Routing_Slip::should_retry (  )  const

Should delivery of this event be retried if it fails?

Definition at line 929 of file Routing_Slip.cpp.

{
  // simple minded test: if it's transient, don't retry it
  // @@todo Eventually this should check timeout, discard policy, etc.
  return this->state_ != rssTRANSIENT;
}

bool TAO_Notify::Routing_Slip::unmarshal ( TAO_Notify_EventChannelFactory ecf,
TAO_InputCDR rscdr 
) [private]

Marshal from CDR.

Definition at line 852 of file Routing_Slip.cpp.

{
  CORBA::ULong count = 0;
  cdr.read_ulong (count);
  for (size_t nreq = 0; nreq < count; ++nreq)
  {
    ACE_CDR::Octet code = 0;
    while (cdr.read_octet(code))
    {
      try
      {
        if (code == TAO_Notify_Method_Request_Dispatch::persistence_code)
        {
          Delivery_Request * prequest;
          ACE_NEW_THROW_EX (
            prequest,
            Delivery_Request(this_ptr_, this->delivery_requests_.size ()),
            CORBA::NO_MEMORY ());
          Delivery_Request_Ptr request(prequest);
          TAO_Notify_Method_Request_Dispatch_Queueable * method =
            TAO_Notify_Method_Request_Dispatch::unmarshal (
              request,
              ecf,
              cdr);
          if (method != 0)
          {
            this->delivery_requests_.push_back (request);
            this->delivery_methods_.push_back (method);
          }
        }
        else if (code == TAO_Notify_Method_Request_Lookup::persistence_code)
        {
          Delivery_Request_Ptr request(new Delivery_Request(this_ptr_, this->delivery_requests_.size ()));
          TAO_Notify_Method_Request_Lookup_Queueable * method =
              TAO_Notify_Method_Request_Lookup::unmarshal (
                request,
                ecf,
                cdr);
          if (method != 0)
          {
            this->delivery_requests_.push_back (request);
            this->delivery_methods_.push_back (method);
          }
        }
      }
      catch (const CORBA::Exception&)
      {
        // @@todo should we log this?
        // just ignore failures
      }
    }
  }
  return this->delivery_requests_.size () > 0;
}

void TAO_Notify::Routing_Slip::wait_persist (  ) 

Wait until the event/routing_slip has been saved at least once.

Definition at line 219 of file Routing_Slip.cpp.

{
  Routing_Slip_Guard guard (this->internals_);
  while (!this->is_safe_)
  {
    this->until_safe_.wait ();
  }
}


Member Data Documentation

How many delivery requests are complete.

Definition at line 207 of file Routing_Slip.h.

Definition at line 228 of file Routing_Slip.h.

Definition at line 226 of file Routing_Slip.h.

size_t TAO_Notify::Routing_Slip::count_continue_new_ = 0 [static, private]

Definition at line 220 of file Routing_Slip.h.

Definition at line 217 of file Routing_Slip.h.

size_t TAO_Notify::Routing_Slip::count_enter_changed_ = 0 [static, private]

Definition at line 227 of file Routing_Slip.h.

Definition at line 225 of file Routing_Slip.h.

Definition at line 229 of file Routing_Slip.h.

Definition at line 221 of file Routing_Slip.h.

Definition at line 230 of file Routing_Slip.h.

size_t TAO_Notify::Routing_Slip::count_enter_new_ = 0 [static, private]

Definition at line 219 of file Routing_Slip.h.

Definition at line 218 of file Routing_Slip.h.

size_t TAO_Notify::Routing_Slip::count_enter_saved_ = 0 [static, private]

Definition at line 223 of file Routing_Slip.h.

size_t TAO_Notify::Routing_Slip::count_enter_saving_ = 0 [static, private]

Definition at line 222 of file Routing_Slip.h.

Definition at line 231 of file Routing_Slip.h.

Definition at line 216 of file Routing_Slip.h.

Definition at line 224 of file Routing_Slip.h.

Methods that should be restarted during event recovery.

Definition at line 204 of file Routing_Slip.h.

A collection of delivery requests.

Definition at line 201 of file Routing_Slip.h.

Definition at line 179 of file Routing_Slip.h.

TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::internals_ [private]

Protection for internal information.

Definition at line 167 of file Routing_Slip.h.

true when event persistence qos is guaranteed

Definition at line 169 of file Routing_Slip.h.

Definition at line 233 of file Routing_Slip.h.

Definition at line 215 of file Routing_Slip.h.

Pointer to a Routing_Slip_Persistence_Manager.

Definition at line 210 of file Routing_Slip.h.

Definition at line 212 of file Routing_Slip.h.

TAO_SYNCH_MUTEX TAO_Notify::Routing_Slip::sequence_lock_ [static, private]

Definition at line 214 of file Routing_Slip.h.

A mini-state machine to control persistence See external doc for circles and arrows.

Smart pointer to this object Provides continuity between smart pointers and "Routing_Slip::this" Also lets the Routing_Slip manage its own minimum lifetime.

Definition at line 176 of file Routing_Slip.h.

ACE_SYNCH_CONDITION TAO_Notify::Routing_Slip::until_safe_ [private]

signalled when is_safe_ goes true

Definition at line 171 of file Routing_Slip.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines