00001 // -*- C++ -*- 00002 //# ThreadCoordinator.h: Definition of the ThreadCoordinator class 00003 //# Copyright (C) 1997,1998,1999,2000,2001,2002,2003 00004 //# Associated Universities, Inc. Washington DC, USA. 00005 //# 00006 //# This library is free software; you can redistribute it and/or modify it 00007 //# under the terms of the GNU Library General Public License as published by 00008 //# the Free Software Foundation; either version 2 of the License, or (at your 00009 //# option) any later version. 00010 //# 00011 //# This library is distributed in the hope that it will be useful, but WITHOUT 00012 //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00013 //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00014 //# License for more details. 00015 //# 00016 //# You should have received a copy of the GNU Library General Public License 00017 //# along with this library; if not, write to the Free Software Foundation, 00018 //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA. 00019 //# 00020 //# Correspondence concerning AIPS++ should be addressed as follows: 00021 //# Internet email: aips2-request@nrao.edu. 00022 //# Postal address: AIPS++ Project Office 00023 //# National Radio Astronomy Observatory 00024 //# 520 Edgemont Road 00025 //# Charlottesville, VA 22903-2475 USA 00026 //# 00027 //# $Id$ 00028 00029 #include <casa/aips.h> 00030 #include <casa/aipstype.h> 00031 #include <cstddef> 00032 00033 //<example> 00034 // 00035 // Code in the master thread: 00036 // 00037 // ThreadCoordinator threadCoordinator (nThreads); 00038 // 00039 // // Create nThread threads and pass them a pointer to the thread coordinator 00040 // // object. Then enter the work loop 00041 // 00042 // for (int i = 0; i < nBuffers; i++){ 00043 // 00044 // // Provide access to the i-th buffer 00045 // 00046 // threadCoordinator->getToWork(& vb); // tell workers to hop to it! 00047 // // blocks if workers still busy 00048 // 00049 // } 00050 // 00051 // Code in each worker thread: 00052 // 00053 // while (True){ 00054 // 00055 // VisBuffer * workingBuffer = threadCoordinator_p->waitForWork (this); 00056 // if (workingBuffer == NULL) 00057 // break; 00058 // 00059 // doSomeWork(workingBuffer); 00060 // } 00061 // </example> 00062 00063 #ifndef SYNTHESIS_THREADCOORDINATOR_H 00064 #define SYNTHESIS_THREADCOORDINATOR_H 00065 00066 namespace casa { 00067 00068 class Barrier; 00069 00070 namespace async { 00071 class Condition; 00072 class Mutex; 00073 class Thread; 00074 } 00075 00076 class String; 00077 class VisBuffer; 00078 00079 class ThreadCoordinatorBase { 00080 00081 public: 00082 00083 virtual ~ThreadCoordinatorBase (); 00084 00085 void waitForWorkersToFinishTask (); 00086 00087 protected: 00088 00089 ThreadCoordinatorBase (Int nThreads, bool logStates); 00090 00091 00092 void dispatchWork (); 00093 void getToWork (); 00094 virtual void installWorkInfo () = 0; 00095 bool waitForWork (const async::Thread * thisThread); 00096 void waitForWorkersToReport (); 00097 Int nThreads_p; 00098 00099 00100 private: 00101 00102 Barrier * barrier_p; 00103 bool logStates_p; 00104 async::Mutex * mutex_p; 00105 volatile Int nThreadsAtBarrier_p; 00106 volatile Int nThreadsDispatched_p; 00107 volatile Bool readyForWork_p; 00108 async::Condition * stateChanged_p; 00109 const VisBuffer * vb_p; 00110 volatile bool workCompleted_p; 00111 volatile bool workToBeDone_p; 00112 00113 void logState (const String & tag) const; 00114 00115 }; 00116 00117 template <typename T> 00118 class ThreadCoordinator : public ThreadCoordinatorBase { 00119 00120 public: 00121 00122 ThreadCoordinator (Int nThreads, Bool logStates = False) : ThreadCoordinatorBase (nThreads, logStates) {} 00123 00124 void 00125 giveWorkToWorkers (T * workInfo) 00126 { 00127 workInfoInWaiting_p = workInfo; 00128 waitForWorkersToReport (); 00129 dispatchWork (); 00130 } 00131 00132 void 00133 getToWork (T * workInfo) 00134 { 00135 workInfoInWaiting_p = workInfo; 00136 ThreadCoordinatorBase::getToWork (); 00137 } 00138 00139 T * 00140 waitForWork (const async::Thread * thisThread) 00141 { 00142 bool ok = ThreadCoordinatorBase::waitForWork (thisThread); 00143 T * result = ok ? workInfo_p : NULL; 00144 00145 return result; 00146 } 00147 00148 void setNThreads(Int n) {nThreads_p=n;}; 00149 Int nThreads() {return nThreads_p;}; 00150 protected: 00151 00152 void 00153 installWorkInfo () 00154 { 00155 workInfo_p = workInfoInWaiting_p; 00156 workInfoInWaiting_p = NULL; 00157 } 00158 00159 00160 private: 00161 00162 T * workInfoInWaiting_p; 00163 T * workInfo_p; 00164 }; 00165 00166 } // end namespace casa 00167 #endif //