Public Member Functions | Private Types | Private Attributes

TAO::CSD::TP_Task Class Reference

Active Object managing a queue of request objects. More...

#include <CSD_TP_Task.h>

Inheritance diagram for TAO::CSD::TP_Task:
Inheritance graph
[legend]
Collaboration diagram for TAO::CSD::TP_Task:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 TP_Task ()
 Default Constructor.
virtual ~TP_Task ()
 Virtual Destructor.
bool add_request (TP_Request *request)
virtual int open (void *num_threads_ptr=0)
 Activate the worker threads.
virtual int svc ()
 The "mainline" executed by each worker thread.
virtual int close (u_long flag=0)
void cancel_servant (PortableServer::Servant servant)
 Cancel all requests that are targeted for the provided servant.

Private Types

enum  { MAX_THREADPOOL_TASK_WORKER_THREADS = 50 }
typedef TAO_SYNCH_MUTEX LockType
typedef ACE_Guard< LockType > GuardType
typedef TAO_Condition< LockType > ConditionType
typedef ACE_Vector< ACE_thread_tThread_Ids

Private Attributes

LockType lock_
 Lock to protect the "state" (all of the data members) of this object.
ConditionType work_available_
ConditionType active_workers_
bool accepting_requests_
bool shutdown_initiated_
 Flag used to initiate a shutdown request to all worker threads.
bool deferred_shutdown_initiated_
bool opened_
 Flag used to avoid multiple open() calls.
Thread_Counter num_threads_
 The number of currently active worker threads.
TP_Queue queue_
 The queue of pending servant requests (a.k.a. the "request queue").
Thread_Ids activated_threads_
 The list of ids for the threads launched by this task.

Detailed Description

Active Object managing a queue of request objects.

There are two types of "users" of a TP_Task object:

1) The TP_Strategy object that "owns" this task object. 2) The worker threads that "run" this task object as an "active object".

The TP_Strategy object that "owns" this task object dictates when the worker threads are activated and when they are shutdown. It also injects requests into this task's queue via calls to the add_request() method. It is also the TP_Strategy object that dictates the number of worker threads to be activated via a call to the set_num_threads() method.

The active object pattern is implemented via the use of the the ACE_Task_Base base class, and each worker thread will invoke this task's svc() method, and when the svc() returns, the worker thread will invoke this task's close() method (with the flag argument equal to 0).

Note:
I just wanted to document an idea... When the pool consists of only one worker thread, we could care less about checking if target servant objects are busy or not. The simple fact that only one thread will be dispatching all requests means that servant objects will never be busy when the thread tests to see if a request is "ready_for_dispatch()". I'm just wondering if this knowledge can be applied to the implementation such that the "pool with one worker thread" case performs more efficiently. This is STP vs SSTP.

Definition at line 77 of file CSD_TP_Task.h.


Member Typedef Documentation

typedef TAO_Condition<LockType> TAO::CSD::TP_Task::ConditionType [private]

Definition at line 111 of file CSD_TP_Task.h.

typedef ACE_Guard<LockType> TAO::CSD::TP_Task::GuardType [private]

Definition at line 110 of file CSD_TP_Task.h.

typedef TAO_SYNCH_MUTEX TAO::CSD::TP_Task::LockType [private]

Definition at line 109 of file CSD_TP_Task.h.

typedef ACE_Vector<ACE_thread_t> TAO::CSD::TP_Task::Thread_Ids [private]

Definition at line 150 of file CSD_TP_Task.h.


Member Enumeration Documentation

anonymous enum [private]
Enumerator:
MAX_THREADPOOL_TASK_WORKER_THREADS 

Definition at line 155 of file CSD_TP_Task.h.


Constructor & Destructor Documentation

TAO::CSD::TP_Task::TP_Task (  ) 

Default Constructor.

Definition at line 8 of file CSD_TP_Task.inl.

  : work_available_(this->lock_),
    active_workers_(this->lock_),
    accepting_requests_(false),
    shutdown_initiated_(false),
    deferred_shutdown_initiated_(false),
    opened_(false),
    num_threads_(0),
    activated_threads_ ((size_t)MAX_THREADPOOL_TASK_WORKER_THREADS)
{
}

TAO::CSD::TP_Task::~TP_Task (  )  [virtual]

Virtual Destructor.

Definition at line 18 of file CSD_TP_Task.cpp.

{
}


Member Function Documentation

bool TAO::CSD::TP_Task::add_request ( TP_Request request  ) 

Put a request object on to the request queue. Returns true if successful, false otherwise (it has been "rejected").

Definition at line 24 of file CSD_TP_Task.cpp.

{
  GuardType guard(this->lock_);

  if (!this->accepting_requests_)
    {
      ACE_DEBUG((LM_DEBUG,"(%P|%t) TP_Task::add_request() - "
                 "not accepting requests\n"));
      return false;
    }

  // We have made the decision that the request is going to be placed upon
  // the queue_.  Inform the request that it is about to be placed into
  // a request queue.  Some requests may not need to do anything in
  // preparation of being placed into a queue.  Others, however, may need
  // to perfom a "clone" operation on some underlying request data before
  // the request can be properly placed into a queue.
  request->prepare_for_queue();

  this->queue_.put(request);

  this->work_available_.signal();

  return true;
}

void TAO::CSD::TP_Task::cancel_servant ( PortableServer::Servant  servant  ) 

Cancel all requests that are targeted for the provided servant.

Definition at line 308 of file CSD_TP_Task.cpp.

{
  GuardType guard(this->lock_);

  // Cancel the requests targeted for the provided servant.
  TP_Cancel_Visitor cancel_visitor(servant);
  this->queue_.accept_visitor(cancel_visitor);
}

int TAO::CSD::TP_Task::close ( u_long  flag = 0  )  [virtual]

Multi-purpose: argument value is used to differentiate purpose.

0) Invoked by each worker thread after its invocation of the svc() method has completed (ie, returned). 1) Invoked by the strategy object to shutdown all worker threads.

Reimplemented from ACE_Task_Base.

Definition at line 240 of file CSD_TP_Task.cpp.

{
  GuardType guard(this->lock_);

  if (flag == 0)
    {
      // Worker thread is closing.
      --this->num_threads_;
      this->active_workers_.signal();
    }
  else
    {
      // Strategy object is shutting down the task.

      // Do nothing if this task has never been open()'ed.
      if (!this->opened_)
        {
          return 0;
        }

      // Set the shutdown flag to true.
      this->shutdown_initiated_ = true;

      // Stop accepting requests.
      this->accepting_requests_ = false;

      // Signal all worker threads waiting on the work_available_ condition.
      this->work_available_.broadcast();

      bool calling_thread_in_tp = false;

      ACE_thread_t my_thr_id = ACE_OS::thr_self ();

      // Check whether the calling thread(calling orb shutdown) is one of the
      // threads in the pool. If it is then it should not wait itself.
      size_t const size = this->activated_threads_.size ();

      for (size_t i = 0; i < size; i ++)
        {
          if (this->activated_threads_[i] == my_thr_id)
            {
              calling_thread_in_tp = true;
              this->deferred_shutdown_initiated_ = true;
              break;
            }
        }

      // Wait until all worker threads have shutdown.
      size_t target_num_threads = calling_thread_in_tp ? 1 : 0;
      while (this->num_threads_ != target_num_threads)
        {
          this->active_workers_.wait();
        }

      // Cancel all requests.
      TP_Cancel_Visitor cancel_visitor;
      this->queue_.accept_visitor(cancel_visitor);

      this->opened_ = false;
      this->shutdown_initiated_ = false;
    }

  return 0;
}

int TAO::CSD::TP_Task::open ( void *  num_threads_ptr = 0  )  [virtual]

Activate the worker threads.

Reimplemented from ACE_Task_Base.

Definition at line 52 of file CSD_TP_Task.cpp.

{
  Thread_Counter num = 1;

  if (num_threads_ptr != 0)
    {
      Thread_Counter* tmp = static_cast<Thread_Counter*> (num_threads_ptr);

      if (tmp == 0)
        {
          //FUZZ: disable check_for_lack_ACE_OS
          ACE_ERROR_RETURN((LM_ERROR,
                            ACE_TEXT ("(%P|%t) TP_Task failed to open.  ")
                            ACE_TEXT ("Invalid argument type passed to open().\n")),
                           -1);
          //FUZZ: enable check_for_lack_ACE_OS
        }

      num = *tmp;
    }

  // We can't activate 0 threads.  Make sure this isn't the case.
  if (num < 1)
    {
      ACE_ERROR_RETURN((LM_ERROR,
                        ACE_TEXT ("(%P|%t) TP_Task failed to open.  ")
                        ACE_TEXT ("num_threads (%u) is less-than 1.\n"),
                        num),
                       -1);
    }

  // Likewise, we can't activate too many.  Make sure this isn't the case.
  if (num > MAX_THREADPOOL_TASK_WORKER_THREADS)
    {
      ACE_ERROR_RETURN((LM_ERROR,
                        ACE_TEXT ("(%P|%t) TP_Task failed to open.  ")
                        ACE_TEXT ("num_threads (%u) is too large.  Max is %d.\n"),
                        num, MAX_THREADPOOL_TASK_WORKER_THREADS),
                        -1);
    }

  // We need the lock acquired from here on out.
  GuardType guard(this->lock_);

  // We can assume that we are in the proper state to handle this open()
  // call as long as we haven't been open()'ed before.
  if (this->opened_)
    {
      //FUZZ: disable check_for_lack_ACE_OS
      ACE_ERROR_RETURN((LM_ERROR,
                        ACE_TEXT ("(%P|%t) TP_Task failed to open.  ")
                        ACE_TEXT ("Task has previously been open()'ed.\n")),
                       -1);
      //FUZZ: enable check_for_lack_ACE_OS
    }

  // Activate this task object with 'num' worker threads.
  if (this->activate(THR_NEW_LWP | THR_JOINABLE, num) != 0)
    {
      // Assumes that when activate returns non-zero return code that
      // no threads were activated.
      ACE_ERROR_RETURN((LM_ERROR,
                        ACE_TEXT ("(%P|%t) TP_Task failed to activate ")
                        ACE_TEXT ("(%d) worker threads.\n"),
                        num),
                       -1);
    }

  // Now we have past the point where we can say we've been open()'ed before.
  this->opened_ = true;

  // Now we wait until all of the threads have started.
  while (this->num_threads_ != num)
    {
      this->active_workers_.wait();
    }

  // We can now accept requests (via our add_request() method).
  this->accepting_requests_ = true;

  return 0;
}

int TAO::CSD::TP_Task::svc ( void   )  [virtual]

The "mainline" executed by each worker thread.

Reimplemented from ACE_Task_Base.

Definition at line 137 of file CSD_TP_Task.cpp.

{
  // Account for this current worker thread having started the
  // execution of this svc() method.
  {
    GuardType guard(this->lock_);
    // Put the thread id into a collection which is used to check whether
    // the orb shutdown is called by one of the threads in the pool.
    ACE_thread_t thr_id = ACE_OS::thr_self ();
    this->activated_threads_.push_back(thr_id);
    ++this->num_threads_;
    this->active_workers_.signal();
  }

  // This visitor object will be re-used over and over again as part of
  // the "GetWork" logic below.
  TP_Dispatchable_Visitor dispatchable_visitor;

  // Start the "GetWork-And-PerformWork" loop for the current worker thread.
  while (1)
    {
      TP_Request_Handle request;

      // Do the "GetWork" step.
      {
        // Acquire the lock until just before we decide to "PerformWork".
        GuardType guard(this->lock_);

        // Start the "GetWork" loop.
        while (request.is_nil())
        {
          if (this->shutdown_initiated_)
            {
              // This breaks us out of all loops with one fell swoop.
              return 0;
            }

          if (this->deferred_shutdown_initiated_)
            {
              this->deferred_shutdown_initiated_  = false;
              return 0;
            }

          // There is no need to visit the queue if it is empty.
          if (!this->queue_.is_empty())
            {
              // Visit the requests in the queue in hopes of
              // locating the first "dispatchable" (ie, not busy) request.
              // If a dispatchable request is located, it is extracted
              // from the queue and saved in a handle data member in the
              // visitor object.
              this->queue_.accept_visitor(dispatchable_visitor);

              // If a dispatchable request is located, it is extracted
              // from the queue and saved in a handle data member in the
              // visitor object.  Let's get a "copy" (or a NULL pointer
              // if the visitor didn't locate/extract one).
              request = dispatchable_visitor.request();
            }

          // Either the queue is empty or we couldn't find any dispatchable
          // requests in the queue at this time.
          if (request.is_nil())
            {
              // Let's wait until we hear about the possibility of
              // work before we go look again.
              this->work_available_.wait();
            }
        }

        // We have dropped out of the "while (request.is_nil())" loop.
        // We only get here is we located/extracted a dispatchable request
        // from the queue.  Note that the visitor will have already
        // marked the target servant as now being busy (because of us).
        // We can now safely release the lock.
      }

      // Do the "PerformWork" step.  We don't need the lock_ to do this.
      request->dispatch();

      // Now that the request has been dispatched, we need to mark the target
      // servant as no longer being busy, and we need to signal any wait()'ing
      // worker threads that there may be some dispatchable requests in the
      // queue now for this not-busy servant.  We need the lock_ to do this.
      {
        GuardType guard(this->lock_);
        request->mark_as_ready();
        this->work_available_.signal();
      }

      // Reset the visitor since we use it over and over.  This
      // will cause the visitor to drop any reference to
      // the dispatched request.
      dispatchable_visitor.reset();

      // Note that the request will be "released" here when the request
      // handle falls out of scope and its destructor performs the
      // _remove_ref() call on the underlying TP_Request object.
    }
}


Member Data Documentation

bool TAO::CSD::TP_Task::accepting_requests_ [private]

Flag used to indicate when this task will (or will not) accept requests via the the add_request() method.

Definition at line 132 of file CSD_TP_Task.h.

Thread_Ids TAO::CSD::TP_Task::activated_threads_ [private]

The list of ids for the threads launched by this task.

Definition at line 153 of file CSD_TP_Task.h.

ConditionType TAO::CSD::TP_Task::active_workers_ [private]

This condition will be signal()'ed each time the num_threads_ data member has its value changed. This is used to keep the close(1) invocation (ie, a shutdown request) blocked until all of the worker threads have stopped running.

Definition at line 128 of file CSD_TP_Task.h.

bool TAO::CSD::TP_Task::deferred_shutdown_initiated_ [private]

Complete shutdown needed to be deferred because the thread calling close(1) was also one of the ThreadPool threads

Definition at line 139 of file CSD_TP_Task.h.

LockType TAO::CSD::TP_Task::lock_ [private]

Lock to protect the "state" (all of the data members) of this object.

Definition at line 115 of file CSD_TP_Task.h.

Thread_Counter TAO::CSD::TP_Task::num_threads_ [private]

The number of currently active worker threads.

Definition at line 145 of file CSD_TP_Task.h.

bool TAO::CSD::TP_Task::opened_ [private]

Flag used to avoid multiple open() calls.

Definition at line 142 of file CSD_TP_Task.h.

TP_Queue TAO::CSD::TP_Task::queue_ [private]

The queue of pending servant requests (a.k.a. the "request queue").

Definition at line 148 of file CSD_TP_Task.h.

bool TAO::CSD::TP_Task::shutdown_initiated_ [private]

Flag used to initiate a shutdown request to all worker threads.

Definition at line 135 of file CSD_TP_Task.h.

ConditionType TAO::CSD::TP_Task::work_available_ [private]

Condition used to signal worker threads that they may be able to find a request in the queue_ that needs to be dispatched to a servant that is currently "not busy". This condition will be signal()'ed each time a new request is added to the queue_, and also when a servant has become "not busy".

Definition at line 122 of file CSD_TP_Task.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines