PThreadUtil.h

Go to the documentation of this file.
00001 #ifndef _SINGLEDISH_PTHREAD_UTILS_H_
00002 #define _SINGLEDISH_PTHREAD_UTILS_H_
00003 
00004 #include <pthread.h>
00005 
00006 #include <iostream>
00007 #include <sstream>
00008 #include <stdexcept>
00009 #include <unistd.h>
00010 
00011 using namespace std;
00012 
00013 #define THROW_IF(condition, msg) \
00014   do { \
00015     if ((condition)) { \
00016       throw runtime_error((msg)); \
00017     } \
00018   } while (false)
00019 
00020 namespace casa { //# NAMESPACE CASA - BEGIN
00021 namespace sdfiller { //# NAMESPACE SDFILLER - BEGIN
00022 
00023 class Mutex {
00024 public:
00025   Mutex() :
00026       mutex_(PTHREAD_MUTEX_INITIALIZER) {
00027 //    cout << "Mutex::Mutex()" << endl;
00028     int ret = pthread_mutex_init(&mutex_, NULL);
00029     THROW_IF(ret != 0, "Mutex::Mutex() failed to initalize mutex");
00030   }
00031   ~Mutex() {
00032 //    cout << "Mutex::~Mutex()" << endl;
00033     int ret = pthread_mutex_destroy(&mutex_);
00034     THROW_IF(ret != 0, "Mutex::~Mutex() failed to destroy mutex");
00035   }
00036   int lock() {
00037 //    cout << "Mutex::lock()" << endl;
00038     int ret = pthread_mutex_lock(&mutex_);
00039     THROW_IF(ret != 0, "Mutex::lock() failed to lock mutex");
00040     return ret;
00041   }
00042   int unlock() {
00043 //    cout << "Mutex::unlock()" << endl;
00044     int ret = pthread_mutex_unlock(&mutex_);
00045     THROW_IF(ret != 0, "Mutex::unlock() failed to unlock mutex");
00046     return ret;
00047   }
00048   int try_lock() {
00049 //    cout << "Mutex::try_lock()" << endl;
00050     return pthread_mutex_trylock(&mutex_);
00051   }
00052 
00053 protected:
00054   pthread_mutex_t mutex_;
00055 
00056   friend class PCondition;
00057 };
00058 
00059 class PCondition {
00060 public:
00061   PCondition(Mutex *mutex) :
00062       mutex_(&(mutex->mutex_)) {
00063     int ret = pthread_cond_init(&cond_, NULL);
00064     THROW_IF(ret != 0,
00065         "PCondition::PCondition() failed to initialize pthread_cond_t");
00066   }
00067 
00068   virtual ~PCondition() {
00069     int ret = pthread_cond_destroy(&cond_);
00070     THROW_IF(ret != 0,
00071         "PCondition::~PCondition() failed to destroy pthread_cond_t");
00072   }
00073   int lock() {
00074 //    cout << "PCondition::lock()" << endl;
00075     return pthread_mutex_lock(mutex_);
00076   }
00077 
00078   int unlock() {
00079 //    cout << "PCondition::unlock()" << endl;
00080     return pthread_mutex_unlock(mutex_);
00081   }
00082 
00083   int wait() {
00084 //    cout << "PCondition::wait()" << endl;
00085     int ret = pthread_cond_wait(&cond_, mutex_);
00086     THROW_IF(ret != 0, "PCondition::wait() failed to block pthread_cond_t");
00087     return ret;
00088   }
00089 
00090   int signal() {
00091 //    cout << "PCondition::signal()" << endl;
00092     int ret = pthread_cond_signal(&cond_);
00093     THROW_IF(ret != 0, "PCondition::signal() failed to release pthread_cond_t");
00094     return ret;
00095   }
00096 private:
00097   pthread_mutex_t *mutex_;
00098   pthread_cond_t cond_;
00099 };
00100 
00101 // implementation of producer consumer model
00102 template<class DataType, ssize_t BufferSize>
00103 class ProducerConsumerModelContext {
00104 public:
00105   typedef ProducerConsumerModelContext<DataType, BufferSize> _Context;
00106 
00107   // production function
00108   static void produce(_Context *context, DataType item) {
00109     context->lock();
00110 
00111     // wait until buffer becomes available for production
00112     while (context->buffer_is_full()) {
00113       context->producer_wait();
00114     }
00115 
00116     assert(!context->buffer_is_full());
00117 
00118     context->push_product(item);
00119 
00120     context->producer_next();
00121 
00122     // send a signal to consumer since something is produced
00123     context->consumer_signal();
00124 
00125     context->unlock();
00126   }
00127 
00128   // consumption function
00129   // return false if no more products available
00130   // otherwise return true
00131   static bool consume(_Context *context, DataType *item) {
00132     context->lock();
00133 
00134     // wait until something is produced
00135     while (context->buffer_is_empty()) {
00136       context->consumer_wait();
00137     }
00138 
00139     assert(!context->buffer_is_empty());
00140 
00141     context->pop_product(item);
00142     bool more_products = (*item != context->end_of_production_);
00143 
00144     context->consumer_next();
00145 
00146     // send a signal to consumer since there are available slot in buffer
00147     context->producer_signal();
00148 
00149     context->unlock();
00150 
00151     return more_products;
00152   }
00153 
00154   // it should be called when production complete
00155   static void complete_production(_Context *context) {
00156     produce(context, context->end_of_production_);
00157   }
00158 
00159   // constructor
00160   ProducerConsumerModelContext(DataType const terminator) :
00161       end_of_production_(terminator), num_product_in_buffer_(0),
00162       producer_index_(0), consumer_index_(0), mutex_(),
00163       consumer_condition_(&mutex_), producer_condition_(&mutex_) {
00164     //std::cout << "end_of_production = " << end_of_production_ << std::endl;
00165   }
00166 
00167   // destructor
00168   ~ProducerConsumerModelContext() {
00169   }
00170 
00171   // utility
00172   template<class T>
00173   static void locked_print(T msg, _Context *context) {
00174     context->lock();
00175     cout << msg << endl;
00176     context->unlock();
00177   }
00178 
00179 private:
00180   int lock() {
00181     return mutex_.lock();
00182   }
00183 
00184   int unlock() {
00185     return mutex_.unlock();
00186   }
00187 
00188   int try_lock() {
00189     return mutex_.try_lock();
00190   }
00191 
00192   int producer_wait() {
00193     return producer_condition_.wait();
00194   }
00195 
00196   int producer_signal() {
00197     return producer_condition_.signal();
00198   }
00199 
00200   int consumer_wait() {
00201     return consumer_condition_.wait();
00202   }
00203 
00204   int consumer_signal() {
00205     return consumer_condition_.signal();
00206   }
00207 
00208   bool buffer_is_full() {
00209     return num_product_in_buffer_ >= BufferSize;
00210   }
00211 
00212   bool buffer_is_empty() {
00213     return num_product_in_buffer_ <= 0;
00214   }
00215 
00216   void producer_next() {
00217     producer_index_++;
00218     producer_index_ %= BufferSize;
00219     num_product_in_buffer_++;
00220   }
00221 
00222   void consumer_next() {
00223     consumer_index_++;
00224     consumer_index_ %= BufferSize;
00225     num_product_in_buffer_--;
00226   }
00227 
00228   void push_product(DataType item) {
00229     buffer_[producer_index_] = item;
00230   }
00231 
00232   void pop_product(DataType *item) {
00233     *item = buffer_[consumer_index_];
00234   }
00235 
00236   // terminator data
00237   // (product == end_of_production_) indicates that production
00238   // is completed.
00239   DataType const end_of_production_;
00240   DataType buffer_[BufferSize];
00241   ssize_t num_product_in_buffer_;
00242   ssize_t producer_index_;
00243   ssize_t consumer_index_;
00244   Mutex mutex_;
00245   PCondition consumer_condition_;
00246   PCondition producer_condition_;
00247 };
00248 
00249 void create_thread(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *),
00250     void *param);
00251 void join_thread(pthread_t *tid, void **status);
00252 
00253 } //# NAMESPACE SDFILLER - END
00254 } //# NAMESPACE CASA - END
00255 
00256 #endif /* _SINGLEDISH_PTHREAD_UTILS_H_ */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 31 Aug 2016 for casa by  doxygen 1.6.1