Public Types | Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Attributes

TAO_Notify_Consumer Class Reference

Astract Base class for wrapping consumer objects that connect to the EventChannel. More...

#include <Consumer.h>

Inheritance diagram for TAO_Notify_Consumer:
Inheritance graph
[legend]
Collaboration diagram for TAO_Notify_Consumer:
Collaboration graph
[legend]

List of all members.

Public Types

enum  DispatchStatus {
  DISPATCH_SUCCESS, DISPATCH_RETRY, DISPATCH_DISCARD, DISPATCH_FAIL,
  DISPATCH_FAIL_TIMEOUT
}
 

Status returned from dispatch attempts.

More...
typedef
TAO_Notify_Refcountable_Guard_T
< TAO_Notify_Consumer
Ptr

Public Member Functions

 TAO_Notify_Consumer (TAO_Notify_ProxySupplier *proxy)
 Constructor.
virtual ~TAO_Notify_Consumer ()
 Destructor.
CORBA::ULong _incr_refcnt (void)
 This method sigantures deliberately match the RefCounting methods required for ESF Proxy.
CORBA::ULong _decr_refcnt (void)
TAO_Notify_ProxySupplierproxy_supplier (void)
 Access Specific Proxy.
virtual TAO_Notify_Proxyproxy (void)
 Access Base Proxy.
void deliver (TAO_Notify_Method_Request_Event *request)
 Dispatch Event to consumer.
virtual void push (const CORBA::Any &event)=0
 Push event to this consumer.
virtual void push (const CosNotification::StructuredEvent &event)=0
 Push event to this consumer.
virtual void push (const CosNotification::EventBatch &event)=0
 Push a batch of events to this consumer.
DispatchStatus dispatch_batch (const CosNotification::EventBatch &batch)
 Dispatch the batch of events to the attached consumer.
void dispatch_pending (void)
 Dispatch the pending events.
CORBA::Boolean is_suspended (void)
 Is the connection suspended?
void suspend (void)
 Suspend Connection.
void resume (void)
 Resume Connection.
virtual void shutdown (void)
 Shutdown the consumer.
virtual void reconnect_from_consumer (TAO_Notify_Consumer *old_consumer)=0
virtual void qos_changed (const TAO_Notify_QoSProperties &qos_properties)
 Override, Peer::qos_changed.
void assume_pending_events (TAO_Notify_Consumer &rhs)
bool is_alive (bool allow_nil_consumer)
 Is the connected consumer still around?
size_t pending_count (void)

Protected Types

typedef ACE_Unbounded_Queue
< TAO_Notify_Method_Request_Event_Queueable * > 
Request_Queue

Protected Member Functions

virtual CORBA::Object_ptr get_consumer (void)=0
DispatchStatus dispatch_request (TAO_Notify_Method_Request_Event *request)
virtual bool dispatch_from_queue (Request_Queue &requests, ACE_Guard< TAO_SYNCH_MUTEX > &ace_mon)
 Attempt to dispatch event from a queue.
void enqueue_request (TAO_Notify_Method_Request_Event *request)
virtual bool enqueue_if_necessary (TAO_Notify_Method_Request_Event *request)
virtual void dispatch_updates_i (const CosNotification::EventTypeSeq &added, const CosNotification::EventTypeSeq &removed)
TAO_SYNCH_MUTEX * proxy_lock (void)
 Get the shared Proxy Lock.
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
void schedule_timer (bool is_error=false)
 Schedule timer.
void cancel_timer (void)
 Cancel timer.
Request_Queuepending_events ()
 = Protected Data Members

Protected Attributes

TAO_Notify_ProxySupplierproxy_
 The Proxy that we associate with.
CORBA::Boolean is_suspended_
 Suspended Flag.
CosNotifyComm::NotifyPublish_var publish_
 Interface that accepts offer_changes.
bool have_not_yet_verified_publish_
const TAO_Notify_Property_Timepacing_
 The Pacing Interval.
TAO_Notify_Property_Long max_batch_size_
 Max. batch size.
long timer_id_
 Timer Id.
TAO_Notify_Timer::Ptr timer_
 The Timer Manager that we use.
ACE_Atomic_Op< TAO_SYNCH_MUTEX,
ACE_Time_Value
last_ping_

Private Attributes

ACE_Auto_Ptr< Request_Queuepending_events_
 Events pending to be delivered.
CORBA::Object_var rtt_obj_

Detailed Description

Astract Base class for wrapping consumer objects that connect to the EventChannel.

Definition at line 44 of file Consumer.h.


Member Typedef Documentation

Definition at line 61 of file Consumer.h.

Definition at line 136 of file Consumer.h.


Member Enumeration Documentation

Status returned from dispatch attempts.

Enumerator:
DISPATCH_SUCCESS 
DISPATCH_RETRY 
DISPATCH_DISCARD 
DISPATCH_FAIL 
DISPATCH_FAIL_TIMEOUT 

Definition at line 51 of file Consumer.h.

                      {
    DISPATCH_SUCCESS,
    DISPATCH_RETRY,   // retry this message
    DISPATCH_DISCARD, // discard this message
    DISPATCH_FAIL,    // discard all messages and disconnect consumer
    DISPATCH_FAIL_TIMEOUT // Same as DISPATCH_FAIL, but due to a timeout
  };


Constructor & Destructor Documentation

TAO_Notify_Consumer::TAO_Notify_Consumer ( TAO_Notify_ProxySupplier proxy  ) 

Constructor.

Definition at line 34 of file Consumer.cpp.

: proxy_ (proxy)
, is_suspended_ (0)
, have_not_yet_verified_publish_ (true)
, pacing_ (proxy->qos_properties_.pacing_interval ())
, max_batch_size_ (CosNotification::MaximumBatchSize, 0)
, timer_id_ (-1)
, timer_ (0)
{
  Request_Queue* pending_events = 0;
  ACE_NEW (pending_events, TAO_Notify_Consumer::Request_Queue ());
  this->pending_events_.reset( pending_events );

  this->timer_.reset( this->proxy ()->timer () );

  // Enable reference counting on the event handler.
  this->reference_counting_policy ().value (
    ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
}

TAO_Notify_Consumer::~TAO_Notify_Consumer (  )  [virtual]

Destructor.

Definition at line 54 of file Consumer.cpp.

{
  if (this->timer_.isSet())
  {
    this->cancel_timer ();
    this->timer_.reset ();
  }
}


Member Function Documentation

CORBA::ULong TAO_Notify_Consumer::_decr_refcnt ( void   ) 

Definition at line 70 of file Consumer.cpp.

{
  return this->remove_reference();
}

CORBA::ULong TAO_Notify_Consumer::_incr_refcnt ( void   ) 

This method sigantures deliberately match the RefCounting methods required for ESF Proxy.

Definition at line 64 of file Consumer.cpp.

{
  return this->add_reference();
}

void TAO_Notify_Consumer::assume_pending_events ( TAO_Notify_Consumer rhs  ) 

Take the pending queue from the rhs, cancel it's timer and schedule our timer. The caller should have locked the proxy lock before calling this method.

Definition at line 716 of file Consumer.cpp.

{
  // No need to lock the this proxy's lock.  It should have been locked
  // by the caller.

  // If the original consumer has pending events
  if (!rhs.pending_events ().is_empty ())
    {
      // We will take them away and cancel it's timer
      this->pending_events_.reset (rhs.pending_events_.release ());
      if (rhs.timer_.isSet ())
        {
          rhs.cancel_timer ();
        }

      // Schedule a new timer for us, which will use the default
      // timer value (unless we have a valid pacing interval).
      this->schedule_timer ();
    }
  if (this->is_suspended()) // double check to avoid race
  {
    this->cancel_timer();
  }
}

void TAO_Notify_Consumer::cancel_timer ( void   )  [protected]

Cancel timer.

Definition at line 645 of file Consumer.cpp.

{
  if (this->timer_.isSet() && this->timer_id_ != -1)
    {
      if (DEBUG_LEVEL  > 5)
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("Consumer %d canceling dispatch timer.\n"),
                    static_cast<int> (this->proxy ()->id ())
                    ));

      this->timer_->cancel_timer (timer_id_);
    }
  this->timer_id_ = -1;
}

void TAO_Notify_Consumer::deliver ( TAO_Notify_Method_Request_Event request  ) 

Dispatch Event to consumer.

Definition at line 163 of file Consumer.cpp.

{
  // Increment reference counts (safely) to prevent this object and its proxy
  // from being deleted while the push is in progress.
  TAO_Notify_Proxy::Ptr proxy_guard (this->proxy ());
  bool queued = enqueue_if_necessary (request);
  if (!queued)
    {
      bool from_timeout = false;
      DispatchStatus status = this->dispatch_request (request);
      switch (status)
        {
        case DISPATCH_SUCCESS:
          {
            request->complete ();
            break;
          }
        case DISPATCH_RETRY:
          {
            if (DEBUG_LEVEL > 1)
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("Consumer %d enqueing event %d due ")
                          ACE_TEXT ("to failed dispatch.\n"),
                          static_cast<int> (this->proxy ()->id ()),
                          request->sequence ()));
            this->enqueue_request (request);
            this->schedule_timer (true);
            break;
          }
        case DISPATCH_DISCARD:
          {
            if (DEBUG_LEVEL  > 0)
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("(%P|%t) Consumer %d: Error during ")
                          ACE_TEXT ("direct dispatch. Discarding event:%d.\n"),
                          static_cast<int> (this->proxy ()->id ()),
                          request->sequence ()
                          ));
            request->complete ();
            break;
          }
        case DISPATCH_FAIL_TIMEOUT:
          from_timeout = true;
          // Fall through
        case DISPATCH_FAIL:
          {
            if (DEBUG_LEVEL  > 0)
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("(%P|%t) Consumer %d: Failed during ")
                          ACE_TEXT ("direct dispatch :%d. Discarding event.\n"),
                          static_cast<int> (this->proxy ()->id ()),
                          request->sequence ()
                          ));
            request->complete ();
            try
              {
                this->proxy_supplier ()->destroy (from_timeout);
              }
            catch (const CORBA::Exception&)
              {
                // todo is there something meaningful we can do here?
                ;
              }
            break;
          }
        }
    }
}

TAO_Notify_Consumer::DispatchStatus TAO_Notify_Consumer::dispatch_batch ( const CosNotification::EventBatch batch  ) 

Dispatch the batch of events to the attached consumer.

Definition at line 368 of file Consumer.cpp.

{
  DispatchStatus result = DISPATCH_SUCCESS;
  try
    {
      this->push (batch);
    }
  catch (const CORBA::OBJECT_NOT_EXIST& ex)
    {
      if (DEBUG_LEVEL  > 0)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
                    ACE_TEXT ("%d::dispatch_batch() %s\n"),
                    static_cast<int> (this->proxy ()->id ()),
                    ex._info ().c_str ()
                    ));
      result = DISPATCH_FAIL;
    }
  catch (const CORBA::TRANSIENT& ex)
    {
      if (DEBUG_LEVEL  > 0)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
                    ACE_TEXT ("%d::dispatch_batch() Transient (minor=%d) %s\n"),
                    static_cast<int> (this->proxy ()->id ()),
                    ex.minor (),
                    ex._info ().c_str ()
                    ));
      const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
      switch (ex.minor () & 0xfffff000u)
        {
        case CORBA::OMGVMCID:
          switch (ex.minor () & 0x00000fffu)
            {
            case 2: // No usable profile
            case 3: // Request cancelled
            case 4: // POA destroyed
              result = DISPATCH_FAIL;
              break;
            default:
              result = DISPATCH_DISCARD;
            }
          break;

        case TAO::VMCID:
        default:
          switch (ex.minor () & BITS_5_THRU_12_MASK)
            {
            case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
              result = DISPATCH_FAIL;
              break;
            case TAO_POA_DISCARDING:
            case TAO_POA_HOLDING:
            default:
              result = DISPATCH_RETRY;
            } break;
        }
    }
  catch (const CORBA::TIMEOUT& ex)
    {
      if (DEBUG_LEVEL  > 0)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
                    ACE_TEXT ("%u::dispatch_batch() %s\n"),
                    this->proxy ()->id (),
                    ex._info().c_str ()
                    ));
      result = DISPATCH_FAIL_TIMEOUT;
    }
  catch (const CORBA::COMM_FAILURE& ex)
    {
      if (DEBUG_LEVEL  > 0)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
                    ACE_TEXT ("%u::dispatch_batch() %s\n"),
                    this->proxy ()->id (),
                    ex._info().c_str ()
                    ));
      result = DISPATCH_FAIL;
    }
  catch (const CORBA::SystemException& ex)
    {
      if (DEBUG_LEVEL  > 0)
        {
          ACE_DEBUG ((LM_ERROR,
                      ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
                      ACE_TEXT ("%d::dispatch_batch() SystemException %s\n"),
                      static_cast<int>  (this->proxy ()->id ()),
                      ex._info ().c_str ()
                      ));
        }
      result = DISPATCH_DISCARD;
    }
  catch (const CORBA::Exception&)
    {
      ACE_ERROR ((LM_ERROR,
                  ACE_TEXT ("(%P|%t) TAO_Notify_Consumer ")
                  ACE_TEXT ("%d::dispatch_batch() Caught unexpected ")
                  ACE_TEXT ("exception pushing batch to consumer.\n"),
                  static_cast<int> (this->proxy ()->id ())
                  ));
      result = DISPATCH_DISCARD;
    }
  return result;
}

bool TAO_Notify_Consumer::dispatch_from_queue ( Request_Queue requests,
ACE_Guard< TAO_SYNCH_MUTEX > &  ace_mon 
) [protected, virtual]

Attempt to dispatch event from a queue.

Called by dispatch_pending. Deliver one or more events to the Consumer. If delivery fails, events are left in the queue (or discarded depending on QoS parameters.) Undelivered, undiscarded requests are left at the front of the queue. Overridden in sequence consumer to dispatch as an EventBatch.

Returns:
false if delivery failed and the request(s) cannot be discarded.

Reimplemented in TAO_Notify_SequencePushConsumer.

Definition at line 505 of file Consumer.cpp.

{
  bool result = true;
  TAO_Notify_Method_Request_Event_Queueable * request;
  if (requests.dequeue_head (request) == 0)
    {
      ace_mon.release ();
      DispatchStatus status = this->dispatch_request (request);
      switch (status)
        {
        case DISPATCH_SUCCESS:
          {
            request->complete ();
            request->release ();
            result = true;
            ace_mon.acquire ();
            break;
          }
        case DISPATCH_RETRY:
          {
            if (DEBUG_LEVEL  > 0)
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("(%P|%t) Consumer %d: Will retry %d\n"),
                          static_cast<int> (this->proxy ()->id ()),
                          request->sequence ()
                          ));
            ace_mon.acquire ();
            requests.enqueue_head (request); // put the failed event back where it was
            result = false;
            break;
          }
        case DISPATCH_DISCARD:
          {
            if (DEBUG_LEVEL  > 0)
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("(%P|%t) Consumer %d: Error during ")
                          ACE_TEXT ("dispatch. Discarding event:%d.\n"),
                          static_cast<int> (this->proxy ()->id ()),
                          request->sequence ()
                          ));
            request->complete ();
            ace_mon.acquire ();
            result = true;
            break;
          }
        case DISPATCH_FAIL:
          {
            if (DEBUG_LEVEL  > 0)
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("(%P|%t) Consumer %d: Failed. ")
                          ACE_TEXT ("Discarding event %d.\n"),
                          static_cast<int> (this->proxy ()->id ()),
                          request->sequence ()
                          ));
            request->complete ();
            ace_mon.acquire ();
            while (requests.dequeue_head (request) == 0)
              {
                ace_mon.release ();
                request->complete ();
                ace_mon.acquire ();
              }
            ace_mon.release ();
            try
              {
                this->proxy_supplier ()->destroy ();
              }
            catch (const CORBA::Exception&)
              {
                // todo is there something reasonable to do here?
              }
            ace_mon.acquire ();
            result = true;
            break;
          }
        default:
          {
            ace_mon.acquire ();
            result = false;
            break;
          }
        }
    }
  return result;
}

void TAO_Notify_Consumer::dispatch_pending ( void   ) 

Dispatch the pending events.

Definition at line 475 of file Consumer.cpp.

{
  if (DEBUG_LEVEL  > 5)
    ACE_DEBUG ((LM_DEBUG,
                ACE_TEXT ("Consumer %d dispatching pending events.  Queue size: %d\n"),
                static_cast<int> (this->proxy ()->id ()),
                this->pending_events().size ()
                ));

  // lock ourselves in memory for the duration
  TAO_Notify_Consumer::Ptr self_grd (this);

  // dispatch events until: 1) the queue is empty; 2) the proxy shuts down, or 3) the dispatch fails
  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
  bool ok = true;
  while (ok
         && !this->proxy_supplier ()->has_shutdown ()
         && !this->pending_events().is_empty ())
    {
      if (! dispatch_from_queue ( this->pending_events(), ace_mon))
        {
          this->schedule_timer (true);
          ok = false;
        }
    }
}

TAO_Notify_Consumer::DispatchStatus TAO_Notify_Consumer::dispatch_request ( TAO_Notify_Method_Request_Event request  )  [protected]

Definition at line 233 of file Consumer.cpp.

{
  DispatchStatus result = DISPATCH_SUCCESS;
  try
    {
      request->event ()->push (this);
      if (DEBUG_LEVEL  > 8)
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("Consumer %d dispatched single event %d.\n"),
                    static_cast<int> (this->proxy ()->id ()),
                    request->sequence ()
                    ));
    }
  catch (const CORBA::OBJECT_NOT_EXIST& ex)
    {
      if (DEBUG_LEVEL  > 0)
        {
          ACE_DEBUG ((LM_ERROR,
                      ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push ")
                      ACE_TEXT ("(request) %s\n"),
                      static_cast<int> (this->proxy ()->id ()),
                      ex._info ().c_str ()
                      ));
        }
      result = DISPATCH_FAIL;
    }
  catch (const CORBA::TRANSIENT& ex)
    {
      if (DEBUG_LEVEL  > 0)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push ")
                    ACE_TEXT ("(request) Transient (minor=%d) %s\n"),
                    static_cast<int> (this->proxy ()->id ()),
                    ex.minor (),
                    ex._info ().c_str ()
                    ));
      const CORBA::ULong BITS_5_THRU_12_MASK = 0x00000f80u;
      switch (ex.minor () & 0xfffff000u)
        {
        case CORBA::OMGVMCID:
          switch (ex.minor () & 0x00000fffu)
            {
            case 2: // No usable profile
            case 3: // Request cancelled
            case 4: // POA destroyed
              result = DISPATCH_FAIL;
              break;
            default:
              result = DISPATCH_DISCARD;
            }
          break;

        case TAO::VMCID:
        default:
          switch (ex.minor () & BITS_5_THRU_12_MASK)
            {
            case TAO_INVOCATION_SEND_REQUEST_MINOR_CODE:
              result = DISPATCH_FAIL;
              break;
            case TAO_POA_DISCARDING:
            case TAO_POA_HOLDING:
            default:
              result = DISPATCH_RETRY;
            } break;
        }
    }
  catch (const CORBA::TIMEOUT& ex)
    {
      if (DEBUG_LEVEL  > 0)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push ")
                    ACE_TEXT ("(request) %s\n"),
                    this->proxy ()->id (),
                    ex._info().c_str ()
                    ));
      result = DISPATCH_FAIL_TIMEOUT;
    }
  catch (const CORBA::COMM_FAILURE& ex)
    {
      if (DEBUG_LEVEL  > 0)
        ACE_DEBUG ((LM_ERROR,
                    ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %u::push ")
                    ACE_TEXT ("(request) %s\n"),
                    this->proxy ()->id (),
                    ex._info().c_str ()
                    ));
      result = DISPATCH_FAIL;
    }
  catch (const CORBA::SystemException& ex)
    {
      if (DEBUG_LEVEL  > 0)
        {
          ACE_DEBUG ((LM_ERROR,
                      ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push ")
                      ACE_TEXT ("(request) SystemException %s\n"),
                      static_cast<int> (this->proxy ()->id ()),
                      ex._info ().c_str ()
                      ));
        }
      result = DISPATCH_DISCARD;
    }
  catch (const CORBA::Exception&)
    {
      ACE_ERROR ( (LM_ERROR,
                   ACE_TEXT ("(%P|%t) TAO_Notify_Consumer %d::push ")
                   ACE_TEXT ("(request) Caught unexpected exception ")
                   ACE_TEXT ("pushing event to consumer.\n"),
                   static_cast<int> (this->proxy ()->id ())
                   ));
      result = DISPATCH_DISCARD;
    }

  // for persistent events that haven't timed out
  // convert "FAIL" & "DISCARD" to "RETRY"
  // for transient events, convert RETRY to DISCARD (hey, best_effort.)
  if (result == DISPATCH_FAIL ||
      result == DISPATCH_FAIL_TIMEOUT || result == DISPATCH_DISCARD)
    {
      if (request->should_retry ())
        {
          result = DISPATCH_RETRY;
        }
    }
  else if (result == DISPATCH_RETRY)
    {
      if (! request->should_retry ())
        {
          result = DISPATCH_DISCARD;
        }
    }

  return result;
}

void TAO_Notify_Consumer::dispatch_updates_i ( const CosNotification::EventTypeSeq added,
const CosNotification::EventTypeSeq removed 
) [protected, virtual]

Definition at line 691 of file Consumer.cpp.

{
  if (this->have_not_yet_verified_publish_)
    {
      this->have_not_yet_verified_publish_ = false; // no need to check again
      if (! this->publish_->_is_a ("IDL:omg.org/CosNotifyComm/NotifyPublish:1.0"))
        this->publish_ = CosNotifyComm::NotifyPublish::_nil();
    }
  if (! CORBA::is_nil (this->publish_.in ()))
    this->publish_->offer_change (added, removed);
}

bool TAO_Notify_Consumer::enqueue_if_necessary ( TAO_Notify_Method_Request_Event request  )  [protected, virtual]

Add request to a queue if necessary. Overridden by sequence consumer to "always" put incoming events into the queue.

Returns:
true the request has been enqueued; false the request should be handled now.

Reimplemented in TAO_Notify_SequencePushConsumer.

Definition at line 118 of file Consumer.cpp.

{
  ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock (), false);
  if (! this->pending_events().is_empty ())
    {
      if (DEBUG_LEVEL > 3)
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("Consumer %d: enqueuing another event. %d\n"),
                    static_cast<int> (this->proxy ()->id ()),
                    request->sequence ()
                    ));
      TAO_Notify_Event::Ptr event (
        request->event ()->queueable_copy ());
      TAO_Notify_Method_Request_Event_Queueable * queue_entry;
      ACE_NEW_THROW_EX (queue_entry,
                        TAO_Notify_Method_Request_Event_Queueable (*request,
                                                                   event),
                        CORBA::NO_MEMORY ());
      this->pending_events().enqueue_tail (queue_entry);
      this->schedule_timer (false);
      return true;
    }
  if (this->is_suspended_ == 1)
    {
      if (DEBUG_LEVEL > 3)
        ACE_DEBUG ((LM_DEBUG,
                    ACE_TEXT ("Suspended Consumer %d enqueing event. %d\n"),
                    static_cast<int> (this->proxy ()->id ()),
                    request->sequence ()
                    ));
      TAO_Notify_Event::Ptr event (
        request->event ()->queueable_copy ());
      TAO_Notify_Method_Request_Event_Queueable * queue_entry;
      ACE_NEW_THROW_EX (queue_entry,
                        TAO_Notify_Method_Request_Event_Queueable (*request,
                                                                   event),
                        CORBA::NO_MEMORY ());
      this->pending_events().enqueue_tail (queue_entry);
      this->schedule_timer (false);
      return true;
    }
  return false;
}

void TAO_Notify_Consumer::enqueue_request ( TAO_Notify_Method_Request_Event request  )  [protected]

Definition at line 96 of file Consumer.cpp.

{
  TAO_Notify_Event::Ptr event (
    request->event ()->queueable_copy ());

  TAO_Notify_Method_Request_Event_Queueable * queue_entry;
  ACE_NEW_THROW_EX (queue_entry,
    TAO_Notify_Method_Request_Event_Queueable (*request, event),
    CORBA::NO_MEMORY ());

  if (DEBUG_LEVEL  > 3) ACE_DEBUG ( (LM_DEBUG,
    ACE_TEXT ("Consumer %d: enqueue_request (%d) @%@.\n"),
    static_cast<int> (this->proxy ()->id ()),
    request->sequence (),
    request
    ));
  ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->proxy_lock ());
  this->pending_events().enqueue_tail (queue_entry);
}

virtual CORBA::Object_ptr TAO_Notify_Consumer::get_consumer ( void   )  [protected, pure virtual]

This method is called by the is_alive() method. It should provide the connected consumer or nil if there is none.

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

int TAO_Notify_Consumer::handle_timeout ( const ACE_Time_Value current_time,
const void *  act = 0 
) [protected, virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 661 of file Consumer.cpp.

{
  if (!this->is_suspended() && this->timer_.isSet() && this->timer_id_ != -1)
    {
      TAO_Notify_Consumer::Ptr grd (this);
      this->timer_id_ = -1;  // This must come first, because dispatch_pending may try to resched
      try
        {
          this->dispatch_pending ();
        }
      catch (...)
        {
        }
    }

  return 0;
}

bool TAO_Notify_Consumer::is_alive ( bool  allow_nil_consumer  ) 

Is the connected consumer still around?

Definition at line 742 of file Consumer.cpp.

{
  bool status = false;
  CORBA::Object_var consumer = this->get_consumer ();
  if (CORBA::is_nil (consumer.in ()))
  {
    // The consumer may not connected or the consumer did
    // not provide a callback. In this case, the liveliness
    // check should return true so it will be validated in 
    // next period. 
    if (allow_nil_consumer)
      return true;
    else
      return status;
  }

  CORBA::PolicyList policy_list;
  try
    {
      bool do_liveliness_check = false;
      ACE_Time_Value now = ACE_OS::gettimeofday ();

      if (CORBA::is_nil (this->rtt_obj_.in ()))
      {
        // We need to determine if the consumer on the other end is still
        // alive.  Since we may be in an upcall from the owner of the
        // original consumer, we have to put a timeout on the call in case
        // the client side is not processing ORB requests at this time.  In
        // the event that the timeout exception occurs, we will assume that
        // the original consumer is still around.  If we get any other
        // exception we will say that the original consumer is not
        // available anymore.
        TimeBase::TimeT timeout = 10000000;
        CORBA::Any timeout_any;
        timeout_any <<= timeout;

        policy_list.length (1);
        policy_list[0] = TAO_Notify_PROPERTIES::instance()->orb()->
                          create_policy (
                                Messaging::RELATIVE_RT_TIMEOUT_POLICY_TYPE,
                                timeout_any);
        rtt_obj_ =
          consumer->_set_policy_overrides (policy_list,
                                          CORBA::ADD_OVERRIDE);

        // Clean up the policy that was allocated in the try/catch
        for (CORBA::ULong i = 0; i < policy_list.length (); i++)
          policy_list[i]->destroy ();

        do_liveliness_check 
          = (last_ping_ == ACE_Time_Value::zero ? true 
          : now - last_ping_.value () >= TAO_Notify_PROPERTIES::instance()->validate_client_delay ());
      }
      else
        do_liveliness_check = 
          now - last_ping_.value () >= TAO_Notify_PROPERTIES::instance()->validate_client_interval ();

      if (CORBA::is_nil (rtt_obj_.in ()))
        status = false;
      else if (do_liveliness_check || allow_nil_consumer)
      {
        last_ping_ = now;
        status = !rtt_obj_->_non_existent ();
      }
      else
        status = true;
    }
  catch (CORBA::TIMEOUT&)
    {
       status = true;
    }
  catch (CORBA::Exception& ex)
    {
      if (DEBUG_LEVEL > 0)
      {        
        ex._tao_print_exception ("TAO_Notify_Consumer::is_alive: false");
      }
    }

  return status;
}

CORBA::Boolean TAO_Notify_Consumer::is_suspended ( void   ) 

Is the connection suspended?

Definition at line 16 of file Consumer.inl.

{
  return this->is_suspended_;
}

size_t TAO_Notify_Consumer::pending_count ( void   ) 

Estimate how many events are pending delivery for this consumer

The estimate does not include events queued at the admin level which have not been passed to this consumer for delivery yet.

Definition at line 28 of file Consumer.inl.

{
  return this->pending_events_->size();
}

TAO_Notify_Consumer::Request_Queue & TAO_Notify_Consumer::pending_events (  )  [protected]

= Protected Data Members

Definition at line 9 of file Consumer.inl.

TAO_Notify_Proxy * TAO_Notify_Consumer::proxy ( void   )  [virtual]

Access Base Proxy.

Definition at line 76 of file Consumer.cpp.

{
  return this->proxy_supplier ();
}

TAO_SYNCH_MUTEX * TAO_Notify_Consumer::proxy_lock ( void   )  [protected]

Get the shared Proxy Lock.

Definition at line 704 of file Consumer.cpp.

{
  return &this->proxy_->lock_;
}

TAO_Notify_ProxySupplier * TAO_Notify_Consumer::proxy_supplier ( void   ) 

Access Specific Proxy.

Definition at line 710 of file Consumer.cpp.

{
  return this->proxy_;
}

virtual void TAO_Notify_Consumer::push ( const CosNotification::EventBatch event  )  [pure virtual]

Push a batch of events to this consumer.

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

virtual void TAO_Notify_Consumer::push ( const CORBA::Any event  )  [pure virtual]
virtual void TAO_Notify_Consumer::push ( const CosNotification::StructuredEvent event  )  [pure virtual]
void TAO_Notify_Consumer::qos_changed ( const TAO_Notify_QoSProperties qos_properties  )  [virtual]

Override, Peer::qos_changed.

Definition at line 82 of file Consumer.cpp.

{
  this->max_batch_size_ = qos_properties.maximum_batch_size ();
}

virtual void TAO_Notify_Consumer::reconnect_from_consumer ( TAO_Notify_Consumer old_consumer  )  [pure virtual]

On reconnect we need to move events from the old consumer to the new one

Implemented in TAO_Notify_PushConsumer, TAO_Notify_SequencePushConsumer, and TAO_Notify_StructuredPushConsumer.

void TAO_Notify_Consumer::resume ( void   ) 

Resume Connection.

Definition at line 88 of file Consumer.cpp.

{
  this->is_suspended_ = 0;

  this->dispatch_pending ();
}

void TAO_Notify_Consumer::schedule_timer ( bool  is_error = false  )  [protected]

Schedule timer.

Definition at line 594 of file Consumer.cpp.

{
  if (this->timer_id_ != -1)
    {
      return; // We only want a single timeout scheduled.
    }
  // don't schedule timer if there's nothing that can be done
  if (this->is_suspended ())
    {
      return;
    }

  ACE_ASSERT (this->timer_.get() != 0);

  // If we're scheduling the timer due to an error then we want to
  // use the retry timeout, otherwise we'll assume that the pacing
  // interval is sufficient for now.
  ACE_Time_Value tv (DEFAULT_RETRY_TIMEOUT);

  if (! is_error)
    {
      if (this->pacing_.is_valid ())
        {
          tv = ORBSVCS_Time::to_Time_Value (this->pacing_.value ());
        }
    }

  if (DEBUG_LEVEL  > 5)
    {
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("Consumer %d: scheduling pacing/retry for %dms.\n"),
                  static_cast<int> (this->proxy ()->id ()), tv.msec ()));
    }

  this->timer_id_ =
    this->timer_->schedule_timer (this, tv, ACE_Time_Value::zero);
  if (this->timer_id_ == -1)
    {
      ACE_ERROR ((LM_ERROR,
                  ACE_TEXT ("TAO_Notify_Consumer %d::schedule_timer () ")
                  ACE_TEXT ("Error scheduling timer.\n"),
                  static_cast<int> (this->proxy ()->id ())
                  ));
    }
  if (this->is_suspended()) // double check to avoid race
  {
    this->cancel_timer();
  }
}

void TAO_Notify_Consumer::shutdown ( void   )  [virtual]

Shutdown the consumer.

Definition at line 680 of file Consumer.cpp.

{
  this->suspend();
  if (this->timer_.isSet ())
    {
      this->cancel_timer ();
      this->timer_.reset ();
    }
}

void TAO_Notify_Consumer::suspend ( void   ) 

Suspend Connection.

Definition at line 22 of file Consumer.inl.

{
  this->is_suspended_ = 1;
}


Member Data Documentation

Definition at line 192 of file Consumer.h.

Suspended Flag.

Definition at line 188 of file Consumer.h.

Last time either push an event or validate connection via _non_exist call.

Definition at line 208 of file Consumer.h.

Max. batch size.

Definition at line 198 of file Consumer.h.

The Pacing Interval.

Definition at line 195 of file Consumer.h.

Events pending to be delivered.

Definition at line 213 of file Consumer.h.

The Proxy that we associate with.

Definition at line 185 of file Consumer.h.

CosNotifyComm::NotifyPublish_var TAO_Notify_Consumer::publish_ [protected]

Interface that accepts offer_changes.

Definition at line 191 of file Consumer.h.

Definition at line 215 of file Consumer.h.

The Timer Manager that we use.

Definition at line 204 of file Consumer.h.

Timer Id.

Definition at line 201 of file Consumer.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines