A simple custom Thread-Pool servant dispatching strategy class. More...
#include <CSD_TP_Strategy.h>
A simple custom Thread-Pool servant dispatching strategy class.
This class represents a concrete implementation of a "Custom Servant Dispatching Strategy". This implementation is being called the "Thread Pool Strategy" reference implementation.
A custom servant dispatching strategy object can be applied to a POA object in order to carry out the servant dispatching duties for that POA.
Definition at line 57 of file CSD_TP_Strategy.h.
enum TAO::CSD::TP_Strategy::CustomRequestOutcome |
Return codes for the custom dispatch_request() methods.
Definition at line 76 of file CSD_TP_Strategy.h.
{ /// The request was successfully put on the request queue. REQUEST_DISPATCHED, /// The request has been executed/completed by a worker thread. REQUEST_EXECUTED, /// The request was removed from the queue and cancelled. REQUEST_CANCELLED, /// The request queue rejected the request REQUEST_REJECTED };
TAO::CSD::TP_Strategy::TP_Strategy | ( | Thread_Counter | num_threads = 1 , |
|
bool | serialize_servants = true | |||
) |
Constructor.
Definition at line 8 of file CSD_TP_Strategy.inl.
: num_threads_(num_threads), serialize_servants_(serialize_servants) { // Assumes that num_threads > 0. }
TAO::CSD::TP_Strategy::~TP_Strategy | ( | ) | [virtual] |
void TAO::CSD::TP_Strategy::cancel_requests | ( | PortableServer::Servant | servant | ) |
Cancel all requests that are targeted for the provided servant. This is requested on the user application level.
Definition at line 245 of file CSD_TP_Strategy.cpp.
{ // Cancel all requests stuck in the queue for the specified servant. this->task_.cancel_servant(servant); }
TAO::CSD::TP_Strategy::CustomRequestOutcome TAO::CSD::TP_Strategy::custom_asynch_request | ( | TP_Custom_Request_Operation * | op | ) |
Inject an asynchronous, custom request into the request queue. This will return control to the calling thread once the request has been placed into the queue (or rejected). Will return REQUEST_DISPATCHED or REQUEST_REJECTED.
Definition at line 50 of file CSD_TP_Strategy.cpp.
{ TP_Servant_State::HandleType servant_state = this->get_servant_state(op->servant()); TP_Custom_Asynch_Request_Handle request = new TP_Custom_Asynch_Request(op, servant_state.in()); return (this->task_.add_request(request.in())) ? REQUEST_DISPATCHED : REQUEST_REJECTED; }
TAO::CSD::TP_Strategy::CustomRequestOutcome TAO::CSD::TP_Strategy::custom_synch_request | ( | TP_Custom_Request_Operation * | op | ) |
Inject a synchronous, custom request into the request queue. This will block the calling thread until the request is handled (dispatched or cancelled) or rejected. Will return REQUEST_EXECUTED, REQUEST_CANCELLED, or REQUEST_REJECTED.
Definition at line 30 of file CSD_TP_Strategy.cpp.
{ TP_Servant_State::HandleType servant_state = this->get_servant_state(op->servant()); TP_Custom_Synch_Request_Handle request = new TP_Custom_Synch_Request(op, servant_state.in()); if (!this->task_.add_request(request.in())) { // The request was rejected by the task. return REQUEST_REJECTED; } // Now we wait until the request is handled (executed or cancelled). return (request->wait()) ? REQUEST_EXECUTED : REQUEST_CANCELLED; }
TAO::CSD::Strategy_Base::DispatchResult TAO::CSD::TP_Strategy::dispatch_collocated_request_i | ( | TAO_ServerRequest & | server_request, | |
const PortableServer::ObjectId & | object_id, | |||
PortableServer::POA_ptr | poa, | |||
const char * | operation, | |||
PortableServer::Servant | servant | |||
) | [protected, virtual] |
Handle the dispatching of a collocated request.
This will cause a new "request" object to be created and pushed on to a "request queue". The worker threads are responsible for servicing the queue, and performing the actual dispatch logic.
Implements TAO::CSD::Strategy_Base.
Definition at line 124 of file CSD_TP_Strategy.cpp.
{ TP_Servant_State::HandleType servant_state = this->get_servant_state(servant); bool is_sync_with_server = server_request.sync_with_server(); bool is_synchronous = server_request.response_expected(); TP_Collocated_Synch_Request_Handle synch_request; TP_Collocated_Synch_With_Server_Request_Handle synch_with_server_request; TP_Request_Handle request; // Create the request object using the appropriate concrete type. if (is_sync_with_server) { synch_with_server_request = new TP_Collocated_Synch_With_Server_Request (server_request, object_id, poa, operation, servant, servant_state.in()); // Give the request handle its own "copy". synch_with_server_request->_add_ref(); request = synch_with_server_request.in(); } else if (is_synchronous) { synch_request = new TP_Collocated_Synch_Request(server_request, object_id, poa, operation, servant, servant_state.in()); // Give the request handle its own "copy". synch_request->_add_ref(); request = synch_request.in(); } else { // Just use the (base) request handle to hold the request object. request = new TP_Collocated_Asynch_Request(server_request, object_id, poa, operation, servant, servant_state.in()); } // Hand the request object to our task so that it can add the request // to its "request queue". if (!this->task_.add_request(request.in())) { // Return the DISPATCH_REJECTED return code so that the caller (our // base class' dispatch_request() method) knows that we did // not handle the request, and that it should be rejected. return DISPATCH_REJECTED; } // We need to wait on the request object if the request type is a // synchronous request. if (!synch_request.is_nil()) { int srw = synch_request->wait(); if (srw == false) { // Raise exception when request was cancelled. throw ::CORBA::NO_IMPLEMENT(); } } else if (!synch_with_server_request.is_nil()) { bool swsr = synch_with_server_request->wait(); if (swsr == false) { // Raise exception when request was cancelled. throw ::CORBA::NO_IMPLEMENT(); } } return DISPATCH_HANDLED; }
TAO::CSD::Strategy_Base::DispatchResult TAO::CSD::TP_Strategy::dispatch_remote_request_i | ( | TAO_ServerRequest & | server_request, | |
const PortableServer::ObjectId & | object_id, | |||
PortableServer::POA_ptr | poa, | |||
const char * | operation, | |||
PortableServer::Servant | servant | |||
) | [protected, virtual] |
Handle the dispatching of a remote request.
This will cause a new "request" object to be created and pushed on to a "request queue". The worker threads are responsible for servicing the queue, and performing the actual dispatch logic.
Implements TAO::CSD::Strategy_Base.
Definition at line 86 of file CSD_TP_Strategy.cpp.
{ TP_Servant_State::HandleType servant_state = this->get_servant_state(servant); // Now we can create the TP_Remote_Request object, and then add it to our // task_'s "request queue". // // TBD-CSD: Need to use a Cached Allocator to "create" the // TP_Remote_Request objects. For now, use the heap. TP_Remote_Request_Handle request = new TP_Remote_Request(server_request, object_id, poa, operation, servant, servant_state.in()); // Hand the request object to our task so that it can add the request // to its "request queue". if (!this->task_.add_request(request.in())) { // Return the DISPATCH_REJECTED return code so that the caller (our // base class' dispatch_request() method) knows that we did // not handle the request, and that it should be rejected. return TAO::CSD::Strategy_Base::DISPATCH_REJECTED; } return TAO::CSD::Strategy_Base::DISPATCH_HANDLED; }
TAO::CSD::TP_Servant_State::HandleType TAO::CSD::TP_Strategy::get_servant_state | ( | PortableServer::Servant | servant | ) | [private] |
Helper method that is responsible for looking up the servant state object in the servant state map *if* the "serialize servants" flag is set to true. In the case where the "serialize servants" flag is set to false, then a "nil" servant state handle object is returned.
servant | - input - a pointer to the servant object. |
PortableServer::POA::ServantNotActive | if the servant state cannot be determined. |
Definition at line 253 of file CSD_TP_Strategy.cpp.
{ TP_Servant_State::HandleType servant_state; if (this->serialize_servants_) { servant_state = this->servant_state_map_.find(servant); } return servant_state; }
bool TAO::CSD::TP_Strategy::poa_activated_event_i | ( | TAO_ORB_Core & | orb_core | ) | [protected, virtual] |
Event - The POA has been activated. This will activate the worker thread(s). Returns true if the worker threads were activated successfully. Otherwise, returns false.
Implements TAO::CSD::Strategy_Base.
Definition at line 64 of file CSD_TP_Strategy.cpp.
void TAO::CSD::TP_Strategy::poa_deactivated_event_i | ( | ) | [protected, virtual] |
Event - The POA has been deactivated. This will shutdown the worker thread(s).
Implements TAO::CSD::Strategy_Base.
Definition at line 73 of file CSD_TP_Strategy.cpp.
{ // Passing in a value of 1 means that we want to shutdown the task, which // equates to causing all worker threads to shutdown. The worker threads // themselves will also invoke the close() method, but the passed-in value // will be 0. So, a 1 means "shutdown", and a 0 means "a single worker // thread is going away". this->task_.close(1); }
void TAO::CSD::TP_Strategy::servant_activated_event_i | ( | PortableServer::Servant | servant, | |
const PortableServer::ObjectId & | oid | |||
) | [protected, virtual] |
Event - A servant has been activated.
Reimplemented from TAO::CSD::Strategy_Base.
Definition at line 217 of file CSD_TP_Strategy.cpp.
{ if (this->serialize_servants_) { // Add the servant to the servant state map. this->servant_state_map_.insert(servant); } }
void TAO::CSD::TP_Strategy::servant_deactivated_event_i | ( | PortableServer::Servant | servant, | |
const PortableServer::ObjectId & | oid | |||
) | [protected, virtual] |
Event - A servant has been deactivated.
Reimplemented from TAO::CSD::Strategy_Base.
Definition at line 230 of file CSD_TP_Strategy.cpp.
{ // Cancel all requests stuck in the queue for the specified servant. this->task_.cancel_servant(servant); if (this->serialize_servants_) { // Remove the servant from the servant state map. this->servant_state_map_.remove(servant); } }
void TAO::CSD::TP_Strategy::set_num_threads | ( | Thread_Counter | num_threads | ) |
Set the number of threads in the pool (must be > 0).
Definition at line 19 of file CSD_TP_Strategy.inl.
{ // Simple Mutator. Assumes that num_threads > 0. this->num_threads_ = num_threads; }
void TAO::CSD::TP_Strategy::set_servant_serialization | ( | bool | serialize_servants | ) |
Turn on/off serialization of servants.
Definition at line 28 of file CSD_TP_Strategy.inl.
{ // Simple Mutator. this->serialize_servants_ = serialize_servants; }
Thread_Counter TAO::CSD::TP_Strategy::num_threads_ [private] |
The number of worker threads to use for the task.
Definition at line 180 of file CSD_TP_Strategy.h.
bool TAO::CSD::TP_Strategy::serialize_servants_ [private] |
The "serialize servants" flag.
Definition at line 183 of file CSD_TP_Strategy.h.
TP_Servant_State_Map TAO::CSD::TP_Strategy::servant_state_map_ [private] |
The map of servant state objects - only used when the "serialize servants" flag is set to true.
Definition at line 187 of file CSD_TP_Strategy.h.
TP_Task TAO::CSD::TP_Strategy::task_ [private] |
This is the active object used by the worker threads. The request queue is owned/managed by the task object. The strategy object puts requests into the task's request queue, and the worker threads service the queued requests by performing the actual servant request dispatching logic.
Definition at line 177 of file CSD_TP_Strategy.h.