Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes

TAO::CSD::TP_Strategy Class Reference

A simple custom Thread-Pool servant dispatching strategy class. More...

#include <CSD_TP_Strategy.h>

Inheritance diagram for TAO::CSD::TP_Strategy:
Inheritance graph
[legend]
Collaboration diagram for TAO::CSD::TP_Strategy:
Collaboration graph
[legend]

List of all members.

Public Types

enum  CustomRequestOutcome { REQUEST_DISPATCHED, REQUEST_EXECUTED, REQUEST_CANCELLED, REQUEST_REJECTED }
 

Return codes for the custom dispatch_request() methods.

More...

Public Member Functions

 TP_Strategy (Thread_Counter num_threads=1, bool serialize_servants=true)
 Constructor.
virtual ~TP_Strategy ()
 Virtual Destructor.
void set_num_threads (Thread_Counter num_threads)
 Set the number of threads in the pool (must be > 0).
void set_servant_serialization (bool serialize_servants)
 Turn on/off serialization of servants.
CustomRequestOutcome custom_synch_request (TP_Custom_Request_Operation *op)
CustomRequestOutcome custom_asynch_request (TP_Custom_Request_Operation *op)
void cancel_requests (PortableServer::Servant servant)

Protected Member Functions

virtual
Strategy_Base::DispatchResult 
dispatch_remote_request_i (TAO_ServerRequest &server_request, const PortableServer::ObjectId &object_id, PortableServer::POA_ptr poa, const char *operation, PortableServer::Servant servant)
virtual
Strategy_Base::DispatchResult 
dispatch_collocated_request_i (TAO_ServerRequest &server_request, const PortableServer::ObjectId &object_id, PortableServer::POA_ptr poa, const char *operation, PortableServer::Servant servant)
virtual bool poa_activated_event_i (TAO_ORB_Core &orb_core)
virtual void poa_deactivated_event_i ()
virtual void servant_activated_event_i (PortableServer::Servant servant, const PortableServer::ObjectId &oid)
 Event - A servant has been activated.
virtual void servant_deactivated_event_i (PortableServer::Servant servant, const PortableServer::ObjectId &oid)
 Event - A servant has been deactivated.

Private Member Functions

TP_Servant_State::HandleType get_servant_state (PortableServer::Servant servant)

Private Attributes

TP_Task task_
Thread_Counter num_threads_
 The number of worker threads to use for the task.
bool serialize_servants_
 The "serialize servants" flag.
TP_Servant_State_Map servant_state_map_

Detailed Description

A simple custom Thread-Pool servant dispatching strategy class.

This class represents a concrete implementation of a "Custom Servant Dispatching Strategy". This implementation is being called the "Thread Pool Strategy" reference implementation.

A custom servant dispatching strategy object can be applied to a POA object in order to carry out the servant dispatching duties for that POA.

Definition at line 57 of file CSD_TP_Strategy.h.


Member Enumeration Documentation

enum TAO::CSD::TP_Strategy::CustomRequestOutcome

Return codes for the custom dispatch_request() methods.

Enumerator:
REQUEST_DISPATCHED 

The request was successfully put on the request queue.

REQUEST_EXECUTED 

The request has been executed/completed by a worker thread.

REQUEST_CANCELLED 

The request was removed from the queue and cancelled.

REQUEST_REJECTED 

The request queue rejected the request.

Definition at line 76 of file CSD_TP_Strategy.h.

      {
        /// The request was successfully put on the request queue.
        REQUEST_DISPATCHED,
        /// The request has been executed/completed by a worker thread.
        REQUEST_EXECUTED,
        /// The request was removed from the queue and cancelled.
        REQUEST_CANCELLED,
        /// The request queue rejected the request
        REQUEST_REJECTED
      };


Constructor & Destructor Documentation

TAO::CSD::TP_Strategy::TP_Strategy ( Thread_Counter  num_threads = 1,
bool  serialize_servants = true 
)

Constructor.

Definition at line 8 of file CSD_TP_Strategy.inl.

  : num_threads_(num_threads),
    serialize_servants_(serialize_servants)
{
  // Assumes that num_threads > 0.
}

TAO::CSD::TP_Strategy::~TP_Strategy (  )  [virtual]

Virtual Destructor.

Definition at line 23 of file CSD_TP_Strategy.cpp.

{
}


Member Function Documentation

void TAO::CSD::TP_Strategy::cancel_requests ( PortableServer::Servant  servant  ) 

Cancel all requests that are targeted for the provided servant. This is requested on the user application level.

Definition at line 245 of file CSD_TP_Strategy.cpp.

{
  // Cancel all requests stuck in the queue for the specified servant.
  this->task_.cancel_servant(servant);
}

TAO::CSD::TP_Strategy::CustomRequestOutcome TAO::CSD::TP_Strategy::custom_asynch_request ( TP_Custom_Request_Operation op  ) 

Inject an asynchronous, custom request into the request queue. This will return control to the calling thread once the request has been placed into the queue (or rejected). Will return REQUEST_DISPATCHED or REQUEST_REJECTED.

Definition at line 50 of file CSD_TP_Strategy.cpp.

{
  TP_Servant_State::HandleType servant_state =
                        this->get_servant_state(op->servant());

  TP_Custom_Asynch_Request_Handle request = new
                          TP_Custom_Asynch_Request(op, servant_state.in());

  return (this->task_.add_request(request.in()))
         ? REQUEST_DISPATCHED : REQUEST_REJECTED;
}

TAO::CSD::TP_Strategy::CustomRequestOutcome TAO::CSD::TP_Strategy::custom_synch_request ( TP_Custom_Request_Operation op  ) 

Inject a synchronous, custom request into the request queue. This will block the calling thread until the request is handled (dispatched or cancelled) or rejected. Will return REQUEST_EXECUTED, REQUEST_CANCELLED, or REQUEST_REJECTED.

Definition at line 30 of file CSD_TP_Strategy.cpp.

{
  TP_Servant_State::HandleType servant_state =
                        this->get_servant_state(op->servant());

  TP_Custom_Synch_Request_Handle request = new
                          TP_Custom_Synch_Request(op, servant_state.in());

  if (!this->task_.add_request(request.in()))
    {
      // The request was rejected by the task.
      return REQUEST_REJECTED;
    }

  // Now we wait until the request is handled (executed or cancelled).
  return (request->wait()) ? REQUEST_EXECUTED : REQUEST_CANCELLED;
}

TAO::CSD::Strategy_Base::DispatchResult TAO::CSD::TP_Strategy::dispatch_collocated_request_i ( TAO_ServerRequest server_request,
const PortableServer::ObjectId object_id,
PortableServer::POA_ptr  poa,
const char *  operation,
PortableServer::Servant  servant 
) [protected, virtual]

Handle the dispatching of a collocated request.

This will cause a new "request" object to be created and pushed on to a "request queue". The worker threads are responsible for servicing the queue, and performing the actual dispatch logic.

Implements TAO::CSD::Strategy_Base.

Definition at line 124 of file CSD_TP_Strategy.cpp.

{
  TP_Servant_State::HandleType servant_state =
                        this->get_servant_state(servant);

  bool is_sync_with_server = server_request.sync_with_server();
  bool is_synchronous      = server_request.response_expected();

  TP_Collocated_Synch_Request_Handle             synch_request;
  TP_Collocated_Synch_With_Server_Request_Handle synch_with_server_request;
  TP_Request_Handle                              request;

  // Create the request object using the appropriate concrete type.
  if (is_sync_with_server)
    {
      synch_with_server_request =
                        new TP_Collocated_Synch_With_Server_Request
                                                     (server_request,
                                                      object_id,
                                                      poa,
                                                      operation,
                                                      servant,
                                                      servant_state.in());

      // Give the request handle its own "copy".
      synch_with_server_request->_add_ref();
      request = synch_with_server_request.in();
    }
  else if (is_synchronous)
    {
      synch_request = new TP_Collocated_Synch_Request(server_request,
                                                      object_id,
                                                      poa,
                                                      operation,
                                                      servant,
                                                      servant_state.in());

      // Give the request handle its own "copy".
      synch_request->_add_ref();
      request = synch_request.in();
    }
  else
    {
      // Just use the (base) request handle to hold the request object.
      request = new TP_Collocated_Asynch_Request(server_request,
                                                 object_id,
                                                 poa,
                                                 operation,
                                                 servant,
                                                 servant_state.in());
    }

  // Hand the request object to our task so that it can add the request
  // to its "request queue".
  if (!this->task_.add_request(request.in()))
    {
      // Return the DISPATCH_REJECTED return code so that the caller (our
      // base class' dispatch_request() method) knows that we did
      // not handle the request, and that it should be rejected.
      return DISPATCH_REJECTED;
    }

  // We need to wait on the request object if the request type is a
  // synchronous request.
  if (!synch_request.is_nil())
    {
      int srw = synch_request->wait();
      if (srw == false)
        {
          // Raise exception when request was cancelled.
          throw ::CORBA::NO_IMPLEMENT();
        }
    }
  else if (!synch_with_server_request.is_nil())
    {
      bool swsr = synch_with_server_request->wait();
      if (swsr == false)
        {
          // Raise exception when request was cancelled.
          throw ::CORBA::NO_IMPLEMENT();
        }
    }

  return DISPATCH_HANDLED;
}

TAO::CSD::Strategy_Base::DispatchResult TAO::CSD::TP_Strategy::dispatch_remote_request_i ( TAO_ServerRequest server_request,
const PortableServer::ObjectId object_id,
PortableServer::POA_ptr  poa,
const char *  operation,
PortableServer::Servant  servant 
) [protected, virtual]

Handle the dispatching of a remote request.

This will cause a new "request" object to be created and pushed on to a "request queue". The worker threads are responsible for servicing the queue, and performing the actual dispatch logic.

Implements TAO::CSD::Strategy_Base.

Definition at line 86 of file CSD_TP_Strategy.cpp.

{
  TP_Servant_State::HandleType servant_state =
                        this->get_servant_state(servant);

  // Now we can create the TP_Remote_Request object, and then add it to our
  // task_'s "request queue".
  //
  // TBD-CSD: Need to use a Cached Allocator to "create" the
  //          TP_Remote_Request objects.  For now, use the heap.
  TP_Remote_Request_Handle request =
                            new TP_Remote_Request(server_request,
                                                  object_id,
                                                  poa,
                                                  operation,
                                                  servant,
                                                  servant_state.in());

  // Hand the request object to our task so that it can add the request
  // to its "request queue".
  if (!this->task_.add_request(request.in()))
    {
      // Return the DISPATCH_REJECTED return code so that the caller (our
      // base class' dispatch_request() method) knows that we did
      // not handle the request, and that it should be rejected.
      return TAO::CSD::Strategy_Base::DISPATCH_REJECTED;
    }

    return TAO::CSD::Strategy_Base::DISPATCH_HANDLED;
}

TAO::CSD::TP_Servant_State::HandleType TAO::CSD::TP_Strategy::get_servant_state ( PortableServer::Servant  servant  )  [private]

Helper method that is responsible for looking up the servant state object in the servant state map *if* the "serialize servants" flag is set to true. In the case where the "serialize servants" flag is set to false, then a "nil" servant state handle object is returned.

Parameters:
servant - input - a pointer to the servant object.
Returns:
a handle to a servant state object.
Exceptions:
PortableServer::POA::ServantNotActive if the servant state cannot be determined.

Definition at line 253 of file CSD_TP_Strategy.cpp.

{
  TP_Servant_State::HandleType servant_state;

  if (this->serialize_servants_)
    {
      servant_state = this->servant_state_map_.find(servant);
    }

  return servant_state;
}

bool TAO::CSD::TP_Strategy::poa_activated_event_i ( TAO_ORB_Core orb_core  )  [protected, virtual]

Event - The POA has been activated. This will activate the worker thread(s). Returns true if the worker threads were activated successfully. Otherwise, returns false.

Implements TAO::CSD::Strategy_Base.

Definition at line 64 of file CSD_TP_Strategy.cpp.

{
  this->task_.thr_mgr(orb_core.thr_mgr());
  // Activates the worker threads, and waits until all have been started.
  return (this->task_.open(&(this->num_threads_)) == 0);
}

void TAO::CSD::TP_Strategy::poa_deactivated_event_i (  )  [protected, virtual]

Event - The POA has been deactivated. This will shutdown the worker thread(s).

Implements TAO::CSD::Strategy_Base.

Definition at line 73 of file CSD_TP_Strategy.cpp.

{
  // Passing in a value of 1 means that we want to shutdown the task, which
  // equates to causing all worker threads to shutdown.  The worker threads
  // themselves will also invoke the close() method, but the passed-in value
  // will be 0.  So, a 1 means "shutdown", and a 0 means "a single worker
  // thread is going away".
  this->task_.close(1);
}

void TAO::CSD::TP_Strategy::servant_activated_event_i ( PortableServer::Servant  servant,
const PortableServer::ObjectId oid 
) [protected, virtual]

Event - A servant has been activated.

Reimplemented from TAO::CSD::Strategy_Base.

Definition at line 217 of file CSD_TP_Strategy.cpp.

{
  if (this->serialize_servants_)
    {
      // Add the servant to the servant state map.
      this->servant_state_map_.insert(servant);
    }
}

void TAO::CSD::TP_Strategy::servant_deactivated_event_i ( PortableServer::Servant  servant,
const PortableServer::ObjectId oid 
) [protected, virtual]

Event - A servant has been deactivated.

Reimplemented from TAO::CSD::Strategy_Base.

Definition at line 230 of file CSD_TP_Strategy.cpp.

{
  // Cancel all requests stuck in the queue for the specified servant.
  this->task_.cancel_servant(servant);

  if (this->serialize_servants_)
    {
      // Remove the servant from the servant state map.
      this->servant_state_map_.remove(servant);
    }
}

void TAO::CSD::TP_Strategy::set_num_threads ( Thread_Counter  num_threads  ) 

Set the number of threads in the pool (must be > 0).

Definition at line 19 of file CSD_TP_Strategy.inl.

{
  // Simple Mutator.  Assumes that num_threads > 0.
  this->num_threads_ = num_threads;
}

void TAO::CSD::TP_Strategy::set_servant_serialization ( bool  serialize_servants  ) 

Turn on/off serialization of servants.

Definition at line 28 of file CSD_TP_Strategy.inl.

{
  // Simple Mutator.
  this->serialize_servants_ = serialize_servants;
}


Member Data Documentation

Thread_Counter TAO::CSD::TP_Strategy::num_threads_ [private]

The number of worker threads to use for the task.

Definition at line 180 of file CSD_TP_Strategy.h.

bool TAO::CSD::TP_Strategy::serialize_servants_ [private]

The "serialize servants" flag.

Definition at line 183 of file CSD_TP_Strategy.h.

TP_Servant_State_Map TAO::CSD::TP_Strategy::servant_state_map_ [private]

The map of servant state objects - only used when the "serialize servants" flag is set to true.

Definition at line 187 of file CSD_TP_Strategy.h.

TP_Task TAO::CSD::TP_Strategy::task_ [private]

This is the active object used by the worker threads. The request queue is owned/managed by the task object. The strategy object puts requests into the task's request queue, and the worker threads service the queued requests by performing the actual servant request dispatching logic.

Definition at line 177 of file CSD_TP_Strategy.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines