PThreadUtil.h
Go to the documentation of this file.00001 #ifndef _SINGLEDISH_PTHREAD_UTILS_H_
00002 #define _SINGLEDISH_PTHREAD_UTILS_H_
00003
00004 #include <pthread.h>
00005
00006 #include <iostream>
00007 #include <sstream>
00008 #include <stdexcept>
00009 #include <unistd.h>
00010
00011 using namespace std;
00012
00013 #define THROW_IF(condition, msg) \
00014 do { \
00015 if ((condition)) { \
00016 throw runtime_error((msg)); \
00017 } \
00018 } while (false)
00019
00020 namespace casa {
00021 namespace sdfiller {
00022
00023 class Mutex {
00024 public:
00025 Mutex() :
00026 mutex_(PTHREAD_MUTEX_INITIALIZER) {
00027
00028 int ret = pthread_mutex_init(&mutex_, NULL);
00029 THROW_IF(ret != 0, "Mutex::Mutex() failed to initalize mutex");
00030 }
00031 ~Mutex() {
00032
00033 int ret = pthread_mutex_destroy(&mutex_);
00034 THROW_IF(ret != 0, "Mutex::~Mutex() failed to destroy mutex");
00035 }
00036 int lock() {
00037
00038 int ret = pthread_mutex_lock(&mutex_);
00039 THROW_IF(ret != 0, "Mutex::lock() failed to lock mutex");
00040 return ret;
00041 }
00042 int unlock() {
00043
00044 int ret = pthread_mutex_unlock(&mutex_);
00045 THROW_IF(ret != 0, "Mutex::unlock() failed to unlock mutex");
00046 return ret;
00047 }
00048 int try_lock() {
00049
00050 return pthread_mutex_trylock(&mutex_);
00051 }
00052
00053 protected:
00054 pthread_mutex_t mutex_;
00055
00056 friend class PCondition;
00057 };
00058
00059 class PCondition {
00060 public:
00061 PCondition(Mutex *mutex) :
00062 mutex_(&(mutex->mutex_)) {
00063 int ret = pthread_cond_init(&cond_, NULL);
00064 THROW_IF(ret != 0,
00065 "PCondition::PCondition() failed to initialize pthread_cond_t");
00066 }
00067
00068 virtual ~PCondition() {
00069 int ret = pthread_cond_destroy(&cond_);
00070 THROW_IF(ret != 0,
00071 "PCondition::~PCondition() failed to destroy pthread_cond_t");
00072 }
00073 int lock() {
00074
00075 return pthread_mutex_lock(mutex_);
00076 }
00077
00078 int unlock() {
00079
00080 return pthread_mutex_unlock(mutex_);
00081 }
00082
00083 int wait() {
00084
00085 int ret = pthread_cond_wait(&cond_, mutex_);
00086 THROW_IF(ret != 0, "PCondition::wait() failed to block pthread_cond_t");
00087 return ret;
00088 }
00089
00090 int signal() {
00091
00092 int ret = pthread_cond_signal(&cond_);
00093 THROW_IF(ret != 0, "PCondition::signal() failed to release pthread_cond_t");
00094 return ret;
00095 }
00096 private:
00097 pthread_mutex_t *mutex_;
00098 pthread_cond_t cond_;
00099 };
00100
00101
00102 template<class DataType, ssize_t BufferSize>
00103 class ProducerConsumerModelContext {
00104 public:
00105 typedef ProducerConsumerModelContext<DataType, BufferSize> _Context;
00106
00107
00108 static void produce(_Context *context, DataType item) {
00109 context->lock();
00110
00111
00112 while (context->buffer_is_full()) {
00113 context->producer_wait();
00114 }
00115
00116 assert(!context->buffer_is_full());
00117
00118 context->push_product(item);
00119
00120 context->producer_next();
00121
00122
00123 context->consumer_signal();
00124
00125 context->unlock();
00126 }
00127
00128
00129
00130
00131 static bool consume(_Context *context, DataType *item) {
00132 context->lock();
00133
00134
00135 while (context->buffer_is_empty()) {
00136 context->consumer_wait();
00137 }
00138
00139 assert(!context->buffer_is_empty());
00140
00141 context->pop_product(item);
00142 bool more_products = (*item != context->end_of_production_);
00143
00144 context->consumer_next();
00145
00146
00147 context->producer_signal();
00148
00149 context->unlock();
00150
00151 return more_products;
00152 }
00153
00154
00155 static void complete_production(_Context *context) {
00156 produce(context, context->end_of_production_);
00157 }
00158
00159
00160 ProducerConsumerModelContext(DataType const terminator) :
00161 end_of_production_(terminator), num_product_in_buffer_(0),
00162 producer_index_(0), consumer_index_(0), mutex_(),
00163 consumer_condition_(&mutex_), producer_condition_(&mutex_) {
00164
00165 }
00166
00167
00168 ~ProducerConsumerModelContext() {
00169 }
00170
00171
00172 template<class T>
00173 static void locked_print(T msg, _Context *context) {
00174 context->lock();
00175 cout << msg << endl;
00176 context->unlock();
00177 }
00178
00179 private:
00180 int lock() {
00181 return mutex_.lock();
00182 }
00183
00184 int unlock() {
00185 return mutex_.unlock();
00186 }
00187
00188 int try_lock() {
00189 return mutex_.try_lock();
00190 }
00191
00192 int producer_wait() {
00193 return producer_condition_.wait();
00194 }
00195
00196 int producer_signal() {
00197 return producer_condition_.signal();
00198 }
00199
00200 int consumer_wait() {
00201 return consumer_condition_.wait();
00202 }
00203
00204 int consumer_signal() {
00205 return consumer_condition_.signal();
00206 }
00207
00208 bool buffer_is_full() {
00209 return num_product_in_buffer_ >= BufferSize;
00210 }
00211
00212 bool buffer_is_empty() {
00213 return num_product_in_buffer_ <= 0;
00214 }
00215
00216 void producer_next() {
00217 producer_index_++;
00218 producer_index_ %= BufferSize;
00219 num_product_in_buffer_++;
00220 }
00221
00222 void consumer_next() {
00223 consumer_index_++;
00224 consumer_index_ %= BufferSize;
00225 num_product_in_buffer_--;
00226 }
00227
00228 void push_product(DataType item) {
00229 buffer_[producer_index_] = item;
00230 }
00231
00232 void pop_product(DataType *item) {
00233 *item = buffer_[consumer_index_];
00234 }
00235
00236
00237
00238
00239 DataType const end_of_production_;
00240 DataType buffer_[BufferSize];
00241 ssize_t num_product_in_buffer_;
00242 ssize_t producer_index_;
00243 ssize_t consumer_index_;
00244 Mutex mutex_;
00245 PCondition consumer_condition_;
00246 PCondition producer_condition_;
00247 };
00248
00249 void create_thread(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *),
00250 void *param);
00251 void join_thread(pthread_t *tid, void **status);
00252
00253 }
00254 }
00255
00256 #endif