AsynchronousInterface2.h

Go to the documentation of this file.
00001 /*
00002  * VlaData.h
00003  *
00004  *  Created on: Sep 21, 2011
00005  *      Author: jjacobs
00006  */
00007 
00008 #ifndef ASYNCHRONOUS_INTERFACE2_H_
00009 #define ASYNCHRONOUS_INTERFACE2_H_
00010 
00011 #include "AsynchronousTools.h"
00012 #include "UtilJ.h"
00013 
00014 using casa::utilj::ThreadTimes;
00015 using casa::utilj::DeltaThreadTimes;
00016 
00017 #include <casa/Arrays/Cube.h>
00018 #include <casa/Arrays/Matrix.h>
00019 #include <casa/Arrays/Vector.h>
00020 #include <casa/Containers/Block.h>
00021 #include <casa/Quanta/MVRadialVelocity.h>
00022 #include <measures/Measures/MRadialVelocity.h>
00023 #include <measures/Measures/MDoppler.h>
00024 #include <msvis/MSVis/VisBufferImplAsync2.h>
00025 #include <msvis/MSVis/VisibilityIterator2.h>
00026 #include <msvis/MSVis/VisibilityIteratorImpl2.h>
00027 
00029 #include <memory>
00030 #include <queue>
00031 #include <vector>
00032 #include <sstream>
00033 
00034 namespace casa {
00035 
00036 namespace vi {
00037 
00038 class VisibilityIterator2;
00039 
00040 class RoviaModifier {
00041 public:
00042 
00043     virtual ~RoviaModifier () {}
00044     virtual void apply (VisibilityIterator2 *) const = 0;
00045     inline operator std::string( ) const {
00046         std::stringstream ss;
00047         print(ss);
00048         return ss.str( );
00049     }
00050 
00051 private:
00052 
00053     virtual void print (std::ostream & o) const = 0;
00054 
00055 };
00056 
00057 class ChannelSelection {
00058 
00059 public:
00060 
00061     ChannelSelection () {}
00062 
00063     ChannelSelection (const Block< Vector<Int> > & blockNGroup,
00064                       const Block< Vector<Int> > & blockStart,
00065                       const Block< Vector<Int> > & blockWidth,
00066                       const Block< Vector<Int> > & blockIncr,
00067                       const Block< Vector<Int> > & blockSpw);
00068 
00069     ChannelSelection (const ChannelSelection & other);
00070     ChannelSelection & operator= (const ChannelSelection & other);
00071 
00072 
00073     void
00074     get (Block< Vector<Int> > & blockNGroup,
00075          Block< Vector<Int> > & blockStart,
00076          Block< Vector<Int> > & blockWidth,
00077          Block< Vector<Int> > & blockIncr,
00078          Block< Vector<Int> > & blockSpw) const;
00079 
00080 protected:
00081 
00082     void copyBlock (const Block <Vector<Int> > & src,
00083                     Block <Vector<Int> > & to) const;
00084 
00085 private:
00086 
00087     Block< Vector<Int> > blockNGroup_p;
00088     Block< Vector<Int> > blockStart_p;
00089     Block< Vector<Int> > blockWidth_p;
00090     Block< Vector<Int> > blockIncr_p;
00091     Block< Vector<Int> > blockSpw_p;
00092 };
00093 
00094 
00095 class SelectChannelModifier : public RoviaModifier {
00096 
00097 public:
00098 
00099     SelectChannelModifier (Int nGroup, Int start, Int width, Int increment, Int spectralWindow);
00100     SelectChannelModifier (const Block< Vector<Int> > & blockNGroup,
00101                            const Block< Vector<Int> > & blockStart,
00102                            const Block< Vector<Int> > & blockWidth,
00103                            const Block< Vector<Int> > & blockIncr,
00104                            const Block< Vector<Int> > & blockSpw);
00105 
00106     void apply (VisibilityIterator2 *) const;
00107 
00108 private:
00109 
00110     Bool channelBlocks_p;
00111     ChannelSelection channelSelection_p;
00112     Int increment_p;
00113     Int nGroup_p;
00114     Int spectralWindow_p;
00115     Int start_p;
00116     Int width_p;
00117 
00118     void print (std::ostream & o) const;
00119     String toCsv (const Block< Vector<Int> > & bv) const;
00120     String toCsv (const Vector<Int> & v) const;
00121 
00122 };
00123 
00124 class SetIntervalModifier : public RoviaModifier {
00125 
00126 public:
00127 
00128     SetIntervalModifier  (Double timeInterval);
00129     void apply (VisibilityIterator2 *) const;
00130 
00131 private:
00132 
00133     Double timeInterval_p;
00134 
00135     void print (std::ostream & o) const;
00136 };
00137 
00138 
00139 class SetRowBlockingModifier : public RoviaModifier {
00140 
00141 public:
00142 
00143     SetRowBlockingModifier (Int nRows);
00144     void apply (VisibilityIterator2 *) const;
00145 
00146 private:
00147 
00148     Int nRows_p;
00149     Int nGroup_p;
00150     Int spectralWindow_p;
00151     Int start_p;
00152     Int width_p;
00153 
00154     void print (std::ostream & o) const;
00155 };
00156 
00157 class RoviaModifiers {
00158 
00159 public:
00160 
00161     ~RoviaModifiers ();
00162 
00163     void add (RoviaModifier *);
00164     void apply (VisibilityIterator2 *);
00165     void clearAndFree ();
00166     RoviaModifiers transferModifiers ();
00167 
00168 private:
00169 
00170     typedef std::vector<RoviaModifier *> Data;
00171     Data data_p;
00172 
00173 };
00174 
00175 class SelectVelocityModifier : public RoviaModifier {
00176 
00177 public:
00178 
00179     SelectVelocityModifier (Int nChan, const MVRadialVelocity& vStart, const MVRadialVelocity& vInc,
00180                             MRadialVelocity::Types rvType, MDoppler::Types dType, Bool precise);
00181     void apply (VisibilityIterator2 *) const;
00182 
00183 private:
00184 
00185     MDoppler::Types dType_p;
00186     Int nChan_p;
00187     Bool precise_p;
00188     MRadialVelocity::Types rvType_p;
00189     MVRadialVelocity vInc_p;
00190     MVRadialVelocity vStart_p;
00191 
00192     virtual void print (std::ostream & o) const;
00193 
00194 };
00195 
00196 
00197 // <summary>
00198 //    VlaDatum is a single elemement in the VlaDatum buffer ring used to support the
00199 //    VisibilityIterator2Async.
00200 // </summary>
00201 
00202 // <use visibility=local>
00203 
00204 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
00205 // </reviewed>
00206 
00207 // <prerequisite>
00208 //   <li> VisBuffer
00209 //   <li> VisBufferAsync2
00210 //   <li> VisibilityIterator2Async
00211 //   <li> VlaData
00212 //   <li> VLAT
00213 // </prerequisite>
00214 //
00215 // <etymology>
00216 //    VlaDatum is the quantum of data associated with a single position of the visibility
00217 //    look-ahead scheme.
00218 // </etymology>
00219 //
00220 // <synopsis>
00221 //    VlaDatum is a single buffer for data produced by the VLAT thread and consumed by the
00222 //    main thread.  A collection of VlaDatum objects is organized as a buffer ring in a
00223 //    VlaData object.
00224 //
00225 //    A VlaDatum object is responsible for maintaining its state as well as containing the set
00226 //    of data accessed from a single position of a VisibilityIterator2.  It contains a
00227 //    VisibilityBufferAsync object to hold the data that will be used by the main thread; other
00228 //    data is maintained in member variables.
00229 //
00230 //    VlaDatum has no concurrency mechanisms built in it; that is handled by the VlaData object.
00231 //    It does support a set of states that indicate its current use:
00232 //        Empty -> Filling -> Full -> Reading -> Empty.
00233 //    Changing state is accomplished by the methods fillStart, fillComplete, readStart and readComplete.
00234 // </synopsis>
00235 //
00236 // <example>
00237 // </example>
00238 //
00239 // <motivation>
00240 // </motivation>
00241 //
00242 // <thrown>
00243 //    <li>AipsError for unhandleable errors
00244 // </thrown>
00245 //
00246 // <todo asof="yyyy/mm/dd">
00247 // </todo>
00248 
00249 
00250 class VlaDatum {
00251 
00252 public:
00253 
00254     typedef enum {Empty, Filling, Full, Reading} State;
00255 
00256     VlaDatum (Subchunk);
00257     ~VlaDatum ();
00258 
00259     Subchunk  getSubchunk () const;
00260     VisBufferImplAsync2 * getVisBuffer ();
00261     //const VisBufferAsync2 * getVisBuffer () const;
00262     Bool isSubchunk (Subchunk) const;
00263 
00264     VisBufferImplAsync2 * releaseVisBufferAsync2 ();
00265     void reset ();
00266 
00267 protected:
00268 
00269 private:
00270 
00271     Subchunk     subchunk_p;
00272     VisBufferImplAsync2 * visBuffer_p;
00273 
00274     // Illegal operations
00275 
00276     VlaDatum & operator= (const VlaDatum & other);
00277 
00278 };
00279 
00280 class VLAT;
00281 
00282 // <summary>
00283 //    The VlaData class is a buffer ring used to support the communication between
00284 //    the visiblity lookahead thread (VLAT) and the main application thread.  It
00285 //    implements the required concurrency control to support this communication.
00286 // </summary>
00287 
00288 // <use visibility=local>
00289 
00290 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
00291 // </reviewed>
00292 
00293 // <prerequisite>
00294 //   <li> VisBuffer
00295 //   <li> VisBufferImplAsync2
00296 //   <li> VisibilityIterator2Async
00297 //   <li> VlaData
00298 //   <li> VLAT
00299 // </prerequisite>
00300 //
00301 // <etymology>
00302 //    VlaData is the entire collection of visibility look-ahead data currently (or potentially)
00303 //    shared between the lookahead and main threads.
00304 // </etymology>
00305 //
00306 // <synopsis>
00307 //    The VlaData object supports the sharing of information between the VLAT look ahead thread and
00308 //    the main thread.  It contains a buffer ring of VlaDatum objects which each hold all of the
00309 //    data that is normally access by a position of a VisibiltyIterator.  Other data that is shared
00310 //    or communicated between the two threads is also managed by VlaData.
00311 //
00312 //    A single mutex (member variable mutex_p) is used to protect data that is shared between the
00313 //    two threads.  In addition there is a single PThreads condition variable, vlaDataChanged_p used
00314 //    to allow either thread to wait for the other to change the state of VlaData object.
00315 //
00316 //    Buffer ring concurrency has two levels: the mutex protecting VlaData and the state of the
00317 //    VlaDatum object.  Whenever a free buffer (VlaDatum object) is available the VLAT thread will
00318 //    fill it with the data from the next position of the VisibilityIterator2; a buffer is free for
00319 //    filling when it is in the Empty state.  Before the VLAT fills a buffer it must call fillStart;
00320 //    this method will block the caller until the next buffer in the ring becomes free; as a side effect
00321 //    the buffer's state becomes Filling.  After fillStart is complete, the VLAT owns the buffer.
00322 //    When the VLAT is done with the buffer it calls fillComplete to relinquish the buffer; this causes
00323 //    the buffer state to change from Filling to Full.
00324 //
00325 //    The main thread calls readStart to get the next filled buffer; the main thread is blocked until
00326 //    the a filled buffer is available.  When the full buffer is ready its state is changed to Reading
00327 //    and readStart returns.  The VLAT thread will not access the buffer while the main thread is reading.
00328 //    The read operation is terminated by calling readComplete; this changes the buffer state to Empty and
00329 //    does not block the main thread except potentially to acquire the mutex.
00330 //
00331 //    The concurrency scheme is fairly sound except for the possibility of low-level data sharing through
00332 //    CASA data structures.  Some of the CASA containers (e.g., Array<T>) can potentially share storage
00333 //    although it appears that normal operation they do not.  Some problems have been encountered with
00334 //    objects that share data via reference-counted pointers.  For instance, objects derived from
00335 //    MeasBase<T,U> (e.g., MDirection, MPosition, MEpoch, etc.) share the object that serves as the
00336 //    frame of reference for the measurement; only by converting the object to text and back again can
00337 //    a user easily obtain a copy which does not share values with another.  It is possible that other
00338 //    objects deep many layers down a complex object may still be waiting to trip up VlaData's
00339 //    concurrency scheme.
00340 //
00341 //    On unusual interaction mediated by VlaData occurs when it is necessary to reset the visibility
00342 //    iterator back to the start of a MeasurementSet.  This usually happens either at the start of the MS
00343 //    sweep (e.g., to reset the row blocking factor of the iterator) or at the end (e.g., to make an
00344 //    additional pass through the MS).  The main thread requests a reset of the VI and then is blocked
00345 //    until the VI is reset.  The sweepTerminationRequested_p variable is set to true; when the VLAT
00346 //    discovers that this variable is true it resets the buffer ring, repositions its VI to the start
00347 //    of the MS and then informs the blocked main thread by setting viResetComplete to true and
00348 //    signalling vlaDataChanged_p.
00349 // </synopsis>
00350 //
00351 // <example>
00352 // </example>
00353 //
00354 // <motivation>
00355 // </motivation>
00356 //
00357 // <thrown>
00358 //    <li> AipsError
00359 // </thrown>
00360 //
00361 // <todo asof="yyyy/mm/dd">
00362 // </todo>
00363 
00364 class AsynchronousInterface;
00365 class InterfaceController;
00366 
00367 class VlaData {
00368 
00369 public:
00370 
00371     VlaData (Int maxNBuffers, async::Mutex & mutex);
00372     ~VlaData ();
00373 
00374     Bool fillCanStart () const;
00375     void fillComplete (VlaDatum * datum);
00376     VlaDatum * fillStart (Subchunk, const ThreadTimes & fillStartTime);
00377     ChannelSelection getChannelSelection () const;
00378     void initialize (const AsynchronousInterface *);
00379     void insertValidChunk (Int chunkNumber);
00380     void insertValidSubChunk (Subchunk);
00381     Bool isValidChunk (Int chunkNumber) const;
00382     Bool isValidSubChunk (Subchunk) const;
00383     void readComplete (Subchunk);
00384     VisBufferImplAsync2 * readStart (Subchunk);
00385     void resetBufferData ();
00386     void setNoMoreData ();
00387     void storeChannelSelection (const ChannelSelection & channelSelection);
00388 
00389     //static void debugBlock ();
00390     //static void debugUnblock ();
00391     //static Bool logThis (Int level);
00392 
00393     //static Bool loggingInitialized_p;
00394     //static Int logLevel_p;
00395 
00396 protected:
00397 
00398 private:
00399 
00400     typedef std::queue<VlaDatum *> Data;
00401     typedef std::queue<Int> ValidChunks;
00402     typedef std::queue<Subchunk> ValidSubChunks;
00403 
00404     class Timing {
00405     public:
00406         ThreadTimes      fill1_p;
00407         ThreadTimes      fill2_p;
00408         ThreadTimes      fill3_p;
00409         DeltaThreadTimes fillCycle_p;
00410         DeltaThreadTimes fillOperate_p;
00411         DeltaThreadTimes fillWait_p;
00412         ThreadTimes      read1_p;
00413         ThreadTimes      read2_p;
00414         ThreadTimes      read3_p;
00415         DeltaThreadTimes readCycle_p;
00416         DeltaThreadTimes readOperate_p;
00417         DeltaThreadTimes readWait_p;
00418         ThreadTimes      timeStart_p;
00419         ThreadTimes      timeStop_p;
00420     };
00421 
00422     ChannelSelection              channelSelection_p; // last channels selected for the VI in use
00423     Data                          data_p;             // Buffer queue
00424     const AsynchronousInterface * interface_p;
00425     const Int                     MaxNBuffers_p;
00426     async::Mutex &                mutex_p; // provided by Asynchronous interface
00427     Timing                        timing_p;
00428     mutable ValidChunks           validChunks_p;       // Queue of valid chunk numbers
00429     mutable ValidSubChunks        validSubChunks_p; // Queue of valid subchunk pairs
00430 
00431 
00432     Int clock (Int arg, Int base);
00433     String makeReport ();
00434 
00435     Bool statsEnabled () const;
00436     void terminateSweep ();
00437 
00439 
00440     static Bool initializeLogging ();
00441 
00442     // Illegal operations
00443 
00444     VlaData (const VlaData & other);
00445     VlaData & operator= (const VlaData & other);
00446 };
00447 
00448 class WriteData {
00449 
00450 public:
00451 
00452     WriteData (const Subchunk & subchunkPair) : subchunkPair_p (subchunkPair) {}
00453 
00454     virtual ~WriteData () {}
00455 
00456     Subchunk getsubchunk () const { return subchunkPair_p;}
00457     virtual void write (VisibilityIterator2 * vi) = 0;
00458 
00459 private:
00460 
00461     Subchunk subchunkPair_p;
00462 
00463 };
00464 
00465 template <typename Data>
00466 class WriteDataImpl : public WriteData {
00467 public:
00468 
00469     typedef void (VisibilityIterator2::* Setter) (const Data &);
00470 
00471     WriteDataImpl (const Subchunk & subchunkPair,
00472                    const Data & data,
00473                    Setter setter)
00474     : WriteData (subchunkPair),
00475       data_p (),
00476       setter_p (setter)
00477     {
00478         data_p.assign (data); // Make a pure copy
00479     }
00480 
00481     void
00482     write (VisibilityIterator2 * vi)
00483     {
00484         (vi ->* setter_p) (data_p);
00485     }
00486 
00487 private:
00488 
00489     Data   data_p;
00490     Setter setter_p;
00491 
00492 };
00493 
00494 template <typename Data>
00495 WriteData *
00496 createWriteData (const Subchunk & subchunkPair,
00497                  const Data & data,
00498                  void (VisibilityIterator2::* setter) (const Data &))
00499 {
00500     return new WriteDataImpl<Data> (subchunkPair, data, setter);
00501 }
00502 
00503 template <typename Data>
00504 class WriteDataImpl2 : public WriteData {
00505 public:
00506 
00507     typedef VisibilityIterator2::DataColumn DataColumn;
00508     typedef void (VisibilityIterator2::* Setter) (const Data &, DataColumn);
00509 
00510     WriteDataImpl2 (const Subchunk & subchunkPair,
00511                     const Data & data,
00512                     DataColumn dataColumn,
00513                     Setter setter)
00514     : WriteData (subchunkPair),
00515       data_p (),
00516       dataColumn_p (dataColumn),
00517       setter_p (setter)
00518     {
00519         data_p.assign (data); // Make a pure copy
00520     }
00521 
00522     void
00523     write (VisibilityIterator2 * vi)
00524     {
00525         (vi ->* setter_p) (data_p, dataColumn_p);
00526     }
00527 
00528 private:
00529 
00530     Data       data_p;
00531     DataColumn dataColumn_p;
00532     Setter     setter_p;
00533 };
00534 
00535 template <typename Data>
00536 WriteData *
00537 createWriteData (const Subchunk & subchunkPair,
00538                  const Data & data,
00539                  VisibilityIterator2::DataColumn dataColumn,
00540                  void (VisibilityIterator2::* setter) (const Data &, VisibilityIterator2::DataColumn))
00541 {
00542     return new WriteDataImpl2 <Data> (subchunkPair, data, dataColumn, setter);
00543 }
00544 
00545 class AsynchronousInterface;
00546 
00547 class WriteQueue {
00548 
00549 public:
00550 
00551     WriteQueue ();
00552     ~WriteQueue ();
00553 
00554     WriteData * dequeue ();
00555     Bool empty (Bool alreadyLocked = False);
00556     void enqueue (WriteData * writeData);
00557 
00558     void initialize (const AsynchronousInterface *);
00559 
00560     void write (VisibilityIterator2 * vi);
00561 
00562 private:
00563 
00564     const AsynchronousInterface * interface_p;
00565     async::Mutex mutex_p;
00566     std::queue<WriteData *> queue_p;
00567 };
00568 
00569 
00570 class AsynchronousInterface {
00571 
00572     //friend class InterfaceController;
00573 
00574 public:
00575 
00576     AsynchronousInterface( const AsynchronousInterface& ) = delete;
00577     AsynchronousInterface& operator=( const AsynchronousInterface& ) = delete;
00578     AsynchronousInterface (int maxNBuffers);
00579     ~AsynchronousInterface ();
00580 
00581     void addModifier (RoviaModifier * modifier);
00582     async::Mutex & getMutex () const;
00583     //async::LockGuard getLockGuard () const;
00584     VlaData * getVlaData ();
00585     VLAT * getVlat ();
00586     WriteQueue & getWriteQueue ();
00587     void initialize ();
00588     Bool isSweepTerminationRequested () const;
00589     Bool isLookaheadTerminationRequested () const;
00590     void notifyAllInterfaceChanged () const;
00591     void requestViReset ();
00592     pair<Bool, RoviaModifiers> resetVi ();
00593     void terminate ();
00594     void terminateLookahead ();
00595     void terminateSweep ();
00596     RoviaModifiers transferRoviaModifiers ();
00597     void viResetComplete ();
00598     Bool viResetRequested ();
00599     void waitForInterfaceChange (async::UniqueLock & uniqueLock) const;
00600 
00601     static Bool initializeLogging ();
00602     static Bool logThis (Int level);
00603 
00604 private:
00605 
00606     mutable async::Condition  interfaceDataChanged_p; // Signals interface data has changed
00607                                                       // o VisBuffer consumed
00608                                                       // o Write data queued
00609                                                       // o Sweep or thread termination requested
00610     volatile Bool             lookaheadTerminationRequested_p; // True to request thread termination
00611     mutable async::Mutex      mutex_p;                // Mutex protecting access to concurrent data
00612     RoviaModifiers            roviaModifiers_p;
00613     volatile Bool             sweepTerminationRequested_p;     // True to request sweep termination
00614                                                                // (e.g., prior to rewinding
00615     volatile Bool             viResetComplete_p; // VI reset process has completed
00616     volatile Bool             viResetRequested_p; // VI reset has been requested
00617     VlaData                   vlaData_p;          // Lookahead data
00618     VLAT *                    vlat_p;             // Lookahead thread
00619     WriteQueue                writeQueue_p;       // Data to be written (writable VIs only)
00620 
00621     static Bool loggingInitialized_p;
00622     static Int logLevel_p;
00623 };
00624 
00625 } // end namespace vi
00626 
00627 } // end namespace casa
00628 
00629 #endif /* ASYNCHRONOUS_INTERFACE_H_ */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 31 Aug 2016 for casa by  doxygen 1.6.1