00001
00002
00003
00004
00005
00006
00007
00008 #ifndef ASYNCHRONOUS_INTERFACE2_H_
00009 #define ASYNCHRONOUS_INTERFACE2_H_
00010
00011 #include "AsynchronousTools.h"
00012 #include "UtilJ.h"
00013
00014 using casa::utilj::ThreadTimes;
00015 using casa::utilj::DeltaThreadTimes;
00016
00017 #include <casa/Arrays/Cube.h>
00018 #include <casa/Arrays/Matrix.h>
00019 #include <casa/Arrays/Vector.h>
00020 #include <casa/Containers/Block.h>
00021 #include <casa/Quanta/MVRadialVelocity.h>
00022 #include <measures/Measures/MRadialVelocity.h>
00023 #include <measures/Measures/MDoppler.h>
00024 #include <msvis/MSVis/VisBufferImplAsync2.h>
00025 #include <msvis/MSVis/VisibilityIterator2.h>
00026 #include <msvis/MSVis/VisibilityIteratorImpl2.h>
00027
00029 #include <memory>
00030 #include <queue>
00031 #include <vector>
00032 #include <sstream>
00033
00034 namespace casa {
00035
00036 namespace vi {
00037
00038 class VisibilityIterator2;
00039
00040 class RoviaModifier {
00041 public:
00042
00043 virtual ~RoviaModifier () {}
00044 virtual void apply (VisibilityIterator2 *) const = 0;
00045 inline operator std::string( ) const {
00046 std::stringstream ss;
00047 print(ss);
00048 return ss.str( );
00049 }
00050
00051 private:
00052
00053 virtual void print (std::ostream & o) const = 0;
00054
00055 };
00056
00057 class ChannelSelection {
00058
00059 public:
00060
00061 ChannelSelection () {}
00062
00063 ChannelSelection (const Block< Vector<Int> > & blockNGroup,
00064 const Block< Vector<Int> > & blockStart,
00065 const Block< Vector<Int> > & blockWidth,
00066 const Block< Vector<Int> > & blockIncr,
00067 const Block< Vector<Int> > & blockSpw);
00068
00069 ChannelSelection (const ChannelSelection & other);
00070 ChannelSelection & operator= (const ChannelSelection & other);
00071
00072
00073 void
00074 get (Block< Vector<Int> > & blockNGroup,
00075 Block< Vector<Int> > & blockStart,
00076 Block< Vector<Int> > & blockWidth,
00077 Block< Vector<Int> > & blockIncr,
00078 Block< Vector<Int> > & blockSpw) const;
00079
00080 protected:
00081
00082 void copyBlock (const Block <Vector<Int> > & src,
00083 Block <Vector<Int> > & to) const;
00084
00085 private:
00086
00087 Block< Vector<Int> > blockNGroup_p;
00088 Block< Vector<Int> > blockStart_p;
00089 Block< Vector<Int> > blockWidth_p;
00090 Block< Vector<Int> > blockIncr_p;
00091 Block< Vector<Int> > blockSpw_p;
00092 };
00093
00094
00095 class SelectChannelModifier : public RoviaModifier {
00096
00097 public:
00098
00099 SelectChannelModifier (Int nGroup, Int start, Int width, Int increment, Int spectralWindow);
00100 SelectChannelModifier (const Block< Vector<Int> > & blockNGroup,
00101 const Block< Vector<Int> > & blockStart,
00102 const Block< Vector<Int> > & blockWidth,
00103 const Block< Vector<Int> > & blockIncr,
00104 const Block< Vector<Int> > & blockSpw);
00105
00106 void apply (VisibilityIterator2 *) const;
00107
00108 private:
00109
00110 Bool channelBlocks_p;
00111 ChannelSelection channelSelection_p;
00112 Int increment_p;
00113 Int nGroup_p;
00114 Int spectralWindow_p;
00115 Int start_p;
00116 Int width_p;
00117
00118 void print (std::ostream & o) const;
00119 String toCsv (const Block< Vector<Int> > & bv) const;
00120 String toCsv (const Vector<Int> & v) const;
00121
00122 };
00123
00124 class SetIntervalModifier : public RoviaModifier {
00125
00126 public:
00127
00128 SetIntervalModifier (Double timeInterval);
00129 void apply (VisibilityIterator2 *) const;
00130
00131 private:
00132
00133 Double timeInterval_p;
00134
00135 void print (std::ostream & o) const;
00136 };
00137
00138
00139 class SetRowBlockingModifier : public RoviaModifier {
00140
00141 public:
00142
00143 SetRowBlockingModifier (Int nRows);
00144 void apply (VisibilityIterator2 *) const;
00145
00146 private:
00147
00148 Int nRows_p;
00149 Int nGroup_p;
00150 Int spectralWindow_p;
00151 Int start_p;
00152 Int width_p;
00153
00154 void print (std::ostream & o) const;
00155 };
00156
00157 class RoviaModifiers {
00158
00159 public:
00160
00161 ~RoviaModifiers ();
00162
00163 void add (RoviaModifier *);
00164 void apply (VisibilityIterator2 *);
00165 void clearAndFree ();
00166 RoviaModifiers transferModifiers ();
00167
00168 private:
00169
00170 typedef std::vector<RoviaModifier *> Data;
00171 Data data_p;
00172
00173 };
00174
00175 class SelectVelocityModifier : public RoviaModifier {
00176
00177 public:
00178
00179 SelectVelocityModifier (Int nChan, const MVRadialVelocity& vStart, const MVRadialVelocity& vInc,
00180 MRadialVelocity::Types rvType, MDoppler::Types dType, Bool precise);
00181 void apply (VisibilityIterator2 *) const;
00182
00183 private:
00184
00185 MDoppler::Types dType_p;
00186 Int nChan_p;
00187 Bool precise_p;
00188 MRadialVelocity::Types rvType_p;
00189 MVRadialVelocity vInc_p;
00190 MVRadialVelocity vStart_p;
00191
00192 virtual void print (std::ostream & o) const;
00193
00194 };
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250 class VlaDatum {
00251
00252 public:
00253
00254 typedef enum {Empty, Filling, Full, Reading} State;
00255
00256 VlaDatum (Subchunk);
00257 ~VlaDatum ();
00258
00259 Subchunk getSubchunk () const;
00260 VisBufferImplAsync2 * getVisBuffer ();
00261
00262 Bool isSubchunk (Subchunk) const;
00263
00264 VisBufferImplAsync2 * releaseVisBufferAsync2 ();
00265 void reset ();
00266
00267 protected:
00268
00269 private:
00270
00271 Subchunk subchunk_p;
00272 VisBufferImplAsync2 * visBuffer_p;
00273
00274
00275
00276 VlaDatum & operator= (const VlaDatum & other);
00277
00278 };
00279
00280 class VLAT;
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364 class AsynchronousInterface;
00365 class InterfaceController;
00366
00367 class VlaData {
00368
00369 public:
00370
00371 VlaData (Int maxNBuffers, async::Mutex & mutex);
00372 ~VlaData ();
00373
00374 Bool fillCanStart () const;
00375 void fillComplete (VlaDatum * datum);
00376 VlaDatum * fillStart (Subchunk, const ThreadTimes & fillStartTime);
00377 ChannelSelection getChannelSelection () const;
00378 void initialize (const AsynchronousInterface *);
00379 void insertValidChunk (Int chunkNumber);
00380 void insertValidSubChunk (Subchunk);
00381 Bool isValidChunk (Int chunkNumber) const;
00382 Bool isValidSubChunk (Subchunk) const;
00383 void readComplete (Subchunk);
00384 VisBufferImplAsync2 * readStart (Subchunk);
00385 void resetBufferData ();
00386 void setNoMoreData ();
00387 void storeChannelSelection (const ChannelSelection & channelSelection);
00388
00389
00390
00391
00392
00393
00394
00395
00396 protected:
00397
00398 private:
00399
00400 typedef std::queue<VlaDatum *> Data;
00401 typedef std::queue<Int> ValidChunks;
00402 typedef std::queue<Subchunk> ValidSubChunks;
00403
00404 class Timing {
00405 public:
00406 ThreadTimes fill1_p;
00407 ThreadTimes fill2_p;
00408 ThreadTimes fill3_p;
00409 DeltaThreadTimes fillCycle_p;
00410 DeltaThreadTimes fillOperate_p;
00411 DeltaThreadTimes fillWait_p;
00412 ThreadTimes read1_p;
00413 ThreadTimes read2_p;
00414 ThreadTimes read3_p;
00415 DeltaThreadTimes readCycle_p;
00416 DeltaThreadTimes readOperate_p;
00417 DeltaThreadTimes readWait_p;
00418 ThreadTimes timeStart_p;
00419 ThreadTimes timeStop_p;
00420 };
00421
00422 ChannelSelection channelSelection_p;
00423 Data data_p;
00424 const AsynchronousInterface * interface_p;
00425 const Int MaxNBuffers_p;
00426 async::Mutex & mutex_p;
00427 Timing timing_p;
00428 mutable ValidChunks validChunks_p;
00429 mutable ValidSubChunks validSubChunks_p;
00430
00431
00432 Int clock (Int arg, Int base);
00433 String makeReport ();
00434
00435 Bool statsEnabled () const;
00436 void terminateSweep ();
00437
00439
00440 static Bool initializeLogging ();
00441
00442
00443
00444 VlaData (const VlaData & other);
00445 VlaData & operator= (const VlaData & other);
00446 };
00447
00448 class WriteData {
00449
00450 public:
00451
00452 WriteData (const Subchunk & subchunkPair) : subchunkPair_p (subchunkPair) {}
00453
00454 virtual ~WriteData () {}
00455
00456 Subchunk getsubchunk () const { return subchunkPair_p;}
00457 virtual void write (VisibilityIterator2 * vi) = 0;
00458
00459 private:
00460
00461 Subchunk subchunkPair_p;
00462
00463 };
00464
00465 template <typename Data>
00466 class WriteDataImpl : public WriteData {
00467 public:
00468
00469 typedef void (VisibilityIterator2::* Setter) (const Data &);
00470
00471 WriteDataImpl (const Subchunk & subchunkPair,
00472 const Data & data,
00473 Setter setter)
00474 : WriteData (subchunkPair),
00475 data_p (),
00476 setter_p (setter)
00477 {
00478 data_p.assign (data);
00479 }
00480
00481 void
00482 write (VisibilityIterator2 * vi)
00483 {
00484 (vi ->* setter_p) (data_p);
00485 }
00486
00487 private:
00488
00489 Data data_p;
00490 Setter setter_p;
00491
00492 };
00493
00494 template <typename Data>
00495 WriteData *
00496 createWriteData (const Subchunk & subchunkPair,
00497 const Data & data,
00498 void (VisibilityIterator2::* setter) (const Data &))
00499 {
00500 return new WriteDataImpl<Data> (subchunkPair, data, setter);
00501 }
00502
00503 template <typename Data>
00504 class WriteDataImpl2 : public WriteData {
00505 public:
00506
00507 typedef VisibilityIterator2::DataColumn DataColumn;
00508 typedef void (VisibilityIterator2::* Setter) (const Data &, DataColumn);
00509
00510 WriteDataImpl2 (const Subchunk & subchunkPair,
00511 const Data & data,
00512 DataColumn dataColumn,
00513 Setter setter)
00514 : WriteData (subchunkPair),
00515 data_p (),
00516 dataColumn_p (dataColumn),
00517 setter_p (setter)
00518 {
00519 data_p.assign (data);
00520 }
00521
00522 void
00523 write (VisibilityIterator2 * vi)
00524 {
00525 (vi ->* setter_p) (data_p, dataColumn_p);
00526 }
00527
00528 private:
00529
00530 Data data_p;
00531 DataColumn dataColumn_p;
00532 Setter setter_p;
00533 };
00534
00535 template <typename Data>
00536 WriteData *
00537 createWriteData (const Subchunk & subchunkPair,
00538 const Data & data,
00539 VisibilityIterator2::DataColumn dataColumn,
00540 void (VisibilityIterator2::* setter) (const Data &, VisibilityIterator2::DataColumn))
00541 {
00542 return new WriteDataImpl2 <Data> (subchunkPair, data, dataColumn, setter);
00543 }
00544
00545 class AsynchronousInterface;
00546
00547 class WriteQueue {
00548
00549 public:
00550
00551 WriteQueue ();
00552 ~WriteQueue ();
00553
00554 WriteData * dequeue ();
00555 Bool empty (Bool alreadyLocked = False);
00556 void enqueue (WriteData * writeData);
00557
00558 void initialize (const AsynchronousInterface *);
00559
00560 void write (VisibilityIterator2 * vi);
00561
00562 private:
00563
00564 const AsynchronousInterface * interface_p;
00565 async::Mutex mutex_p;
00566 std::queue<WriteData *> queue_p;
00567 };
00568
00569
00570 class AsynchronousInterface {
00571
00572
00573
00574 public:
00575
00576 AsynchronousInterface( const AsynchronousInterface& ) = delete;
00577 AsynchronousInterface& operator=( const AsynchronousInterface& ) = delete;
00578 AsynchronousInterface (int maxNBuffers);
00579 ~AsynchronousInterface ();
00580
00581 void addModifier (RoviaModifier * modifier);
00582 async::Mutex & getMutex () const;
00583
00584 VlaData * getVlaData ();
00585 VLAT * getVlat ();
00586 WriteQueue & getWriteQueue ();
00587 void initialize ();
00588 Bool isSweepTerminationRequested () const;
00589 Bool isLookaheadTerminationRequested () const;
00590 void notifyAllInterfaceChanged () const;
00591 void requestViReset ();
00592 pair<Bool, RoviaModifiers> resetVi ();
00593 void terminate ();
00594 void terminateLookahead ();
00595 void terminateSweep ();
00596 RoviaModifiers transferRoviaModifiers ();
00597 void viResetComplete ();
00598 Bool viResetRequested ();
00599 void waitForInterfaceChange (async::UniqueLock & uniqueLock) const;
00600
00601 static Bool initializeLogging ();
00602 static Bool logThis (Int level);
00603
00604 private:
00605
00606 mutable async::Condition interfaceDataChanged_p;
00607
00608
00609
00610 volatile Bool lookaheadTerminationRequested_p;
00611 mutable async::Mutex mutex_p;
00612 RoviaModifiers roviaModifiers_p;
00613 volatile Bool sweepTerminationRequested_p;
00614
00615 volatile Bool viResetComplete_p;
00616 volatile Bool viResetRequested_p;
00617 VlaData vlaData_p;
00618 VLAT * vlat_p;
00619 WriteQueue writeQueue_p;
00620
00621 static Bool loggingInitialized_p;
00622 static Int logLevel_p;
00623 };
00624
00625 }
00626
00627 }
00628
00629 #endif