00001 // -*- C++ -*- 00002 00003 //============================================================================= 00004 /** 00005 * @file Incoming_Message_Queue.h 00006 * 00007 * Incoming_Message_Queue.h,v 1.24 2006/06/19 19:47:17 jwillemsen Exp 00008 * 00009 * @author Balachandran Natarajan <bala@cs.wustl.edu> 00010 */ 00011 //============================================================================= 00012 00013 #ifndef TAO_INCOMING_MESSAGE_QUEUE_H 00014 #define TAO_INCOMING_MESSAGE_QUEUE_H 00015 00016 #include /**/ "ace/pre.h" 00017 00018 #include "tao/Pluggable_Messaging_Utils.h" 00019 #include "ace/Message_Block.h" 00020 00021 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00022 # pragma once 00023 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00024 00025 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00026 class ACE_Allocator; 00027 ACE_END_VERSIONED_NAMESPACE_DECL 00028 00029 TAO_BEGIN_VERSIONED_NAMESPACE_DECL 00030 00031 class TAO_ORB_Core; 00032 class TAO_Queued_Data; 00033 class TAO_Transport; 00034 00035 /** 00036 * @class TAO_Incoming_Message_Queue 00037 * 00038 * @brief A queue of the messages in the incoming data path. 00039 * 00040 * Please read the documentation in the TAO_Transport class to find 00041 * out more about the design of the incoming data path. 00042 * 00043 * Under certain conditions TAO may have to maintain a queue 00044 * per-connection. This queue is drained by the pluggable 00045 * protocols framework, normally under control of the ACE_Reactor, but 00046 * other configurations are conceivable. 00047 * 00048 * The memory that is allocated for holding the messages comes from 00049 * the global pool for the following reasons 00050 * 00051 * - the thread that reads a part of the message would not be the same 00052 * thread that reads and fills the rest of the message 00053 * - the thread that actually processes the message can be totally 00054 * different. 00055 * 00056 */ 00057 00058 class TAO_Export TAO_Incoming_Message_Queue 00059 { 00060 public: 00061 00062 /// Constructor. 00063 TAO_Incoming_Message_Queue (TAO_ORB_Core *orb_core); 00064 00065 /// Destructor. 00066 ~TAO_Incoming_Message_Queue (void); 00067 00068 /// Adding and deleting a node from the queue. 00069 TAO_Queued_Data *dequeue_head (void); 00070 TAO_Queued_Data *dequeue_tail (void); 00071 int enqueue_tail (TAO_Queued_Data *nd); 00072 00073 /// Return the length of the queue.. 00074 CORBA::ULong queue_length (void) const; 00075 00076 private: 00077 00078 friend class TAO_Transport; 00079 00080 private: 00081 /*! 00082 \brief A circular linked list of messages awaiting processing. 00083 00084 \a last_message_added_ points to the most recent message added to 00085 the list. The earliest addition can be easily accessed via 00086 \a last_message_added_->next_. 00087 */ 00088 TAO_Queued_Data *last_added_; 00089 00090 /// The size of the queue 00091 CORBA::ULong size_; 00092 00093 /// Copy of our ORB Core 00094 TAO_ORB_Core *orb_core_; 00095 }; 00096 00097 /// Constant value indicating that the correct value is unknown yet, 00098 /// probably parsing the header failed due to insufficient data in buffer. 00099 const size_t TAO_MISSING_DATA_UNDEFINED = ~((size_t) 0); // MAX_SIZE_T 00100 00101 /************************************************************************/ 00102 00103 /** 00104 * @class TAO_Queued_Data 00105 * 00106 * @brief Represents a node in the queue of incoming messages. 00107 * 00108 * This class contains necessary information about a message that is 00109 * stored in the queue. Such a node can be used by the incoming thread 00110 * from the reactor to dequeue and process the message by sending it 00111 * to the higher layers of the ORB. 00112 * 00113 * The ACE_Message_Block contained within this class may contain a chain 00114 * of message blocks (usually when GIOP fragments are involved). In that 00115 * case consolidate () needs to be called prior to being sent to higher 00116 * layers of the ORB when the GIOP fragment chain is complete. 00117 */ 00118 00119 class TAO_Export TAO_Queued_Data 00120 { 00121 public: 00122 /// Default Constructor 00123 TAO_Queued_Data (ACE_Allocator *alloc = 0); 00124 00125 /// Constructor. 00126 TAO_Queued_Data (ACE_Message_Block *mb, ACE_Allocator *alloc = 0); 00127 00128 /// Copy constructor. 00129 TAO_Queued_Data (const TAO_Queued_Data &qd); 00130 00131 /// Creation of a node in the queue. 00132 static TAO_Queued_Data* make_queued_data (ACE_Allocator *alloc = 0); 00133 00134 /// Deletion of a node from the queue. 00135 static void release (TAO_Queued_Data *qd); 00136 00137 /// Duplicate ourselves. This creates a copy of ourselves on the 00138 /// heap and returns a pointer to the duplicated node. 00139 static TAO_Queued_Data* duplicate (TAO_Queued_Data &qd); 00140 00141 /// Consolidate this fragments chained message blocks into one. 00142 /// @return -1 if consolidation failed, eg out or memory, otherwise 0 00143 int consolidate (void); 00144 00145 public: 00146 00147 /// The message block that contains the message. 00148 ACE_Message_Block *msg_block_; 00149 00150 /*! 00151 @name Missing Data details 00152 00153 The \a missing_data_ member contains the number of bytes of 00154 data missing from \a msg_block_. 00155 */ 00156 //@{ 00157 /*! Data missing in the above message that hasn't been read or processed yet, 00158 the value TAO_MISSING_DATA_UNDEFINED indicates it hasn't been processed yet, 00159 otherwise greater or equal zero. */ 00160 size_t missing_data_; 00161 //@} 00162 00163 /// Many protocols like GIOP have a major and minor version 00164 /// information that would be needed to read and decipher the 00165 /// message. 00166 CORBA::Octet major_version_; 00167 CORBA::Octet minor_version_; 00168 00169 /// The byte order of the message that is stored in the node. 00170 CORBA::Octet byte_order_; 00171 00172 /// Some messages can be fragmented by the protocol (this is an ORB 00173 /// level fragmentation on top of the TCP/IP fragmentation. This 00174 /// member indicates whether the message that we have recd. and 00175 /// queue already has more fragments that is missing.. 00176 CORBA::Octet more_fragments_; 00177 00178 /// The message type of the message 00179 TAO_Pluggable_Message_Type msg_type_; 00180 00181 /// Pounter to the next element in the queue. 00182 TAO_Queued_Data *next_; 00183 00184 private: 00185 /// Replace the datablock with a one allocated on the heap or 00186 /// allocator 00187 static void replace_data_block (ACE_Message_Block &mb); 00188 00189 private: 00190 00191 /// The allocator used to allocate this class. 00192 ACE_Allocator *allocator_; 00193 }; 00194 00195 TAO_END_VERSIONED_NAMESPACE_DECL 00196 00197 #if defined (__ACE_INLINE__) 00198 # include "tao/Incoming_Message_Queue.inl" 00199 #endif /* __ACE_INLINE__ */ 00200 00201 #include /**/ "ace/post.h" 00202 #endif /*TAO_INCOMING_MESSAGE_QUEUE_H*/