00001
00002
00003
00004
00005
00006
00007
00008 #ifndef ASYNCHRONOUS_INTERFACE_H_
00009 #define ASYNCHRONOUS_INTERFACE_H_
00010
00011 #include "AsynchronousTools.h"
00012 #include "UtilJ.h"
00013
00014 using casa::utilj::ThreadTimes;
00015 using casa::utilj::DeltaThreadTimes;
00016
00017 #include <casa/Arrays/Cube.h>
00018 #include <casa/Arrays/Matrix.h>
00019 #include <casa/Arrays/Vector.h>
00020 #include <casa/Containers/Block.h>
00021 #include <casa/Quanta/MVRadialVelocity.h>
00022 #include <measures/Measures/MRadialVelocity.h>
00023 #include <measures/Measures/MDoppler.h>
00024 #include <msvis/MSVis/VisBufferAsync.h>
00025 #include <msvis/MSVis/VisibilityIterator.h>
00026 #include <msvis/MSVis/VisibilityIteratorImpl.h>
00027
00029 #include <memory>
00030 #include <queue>
00031 #include <vector>
00032
00033 namespace casa {
00034
00035 class ROVisibilityIterator;
00036
00037 namespace asyncio {
00038
00039 class RoviaModifier {
00040 public:
00041
00042 friend std::ostream & operator<< (std::ostream & o, const RoviaModifier & m);
00043
00044 virtual ~RoviaModifier () {}
00045 virtual void apply (ROVisibilityIterator *) const = 0;
00046
00047 inline operator std::string( ) const {
00048 std::stringstream ss;
00049 print(ss);
00050 return ss.str( );
00051 }
00052
00053 private:
00054
00055 virtual void print (std::ostream & o) const = 0;
00056
00057 };
00058
00059 class ChannelSelection {
00060
00061 public:
00062
00063 ChannelSelection () {}
00064
00065 ChannelSelection (const Block< Vector<Int> > & blockNGroup,
00066 const Block< Vector<Int> > & blockStart,
00067 const Block< Vector<Int> > & blockWidth,
00068 const Block< Vector<Int> > & blockIncr,
00069 const Block< Vector<Int> > & blockSpw);
00070
00071 ChannelSelection (const ChannelSelection & other);
00072 ChannelSelection & operator= (const ChannelSelection & other);
00073
00074
00075 void
00076 get (Block< Vector<Int> > & blockNGroup,
00077 Block< Vector<Int> > & blockStart,
00078 Block< Vector<Int> > & blockWidth,
00079 Block< Vector<Int> > & blockIncr,
00080 Block< Vector<Int> > & blockSpw) const;
00081
00082 protected:
00083
00084 void copyBlock (const Block <Vector<Int> > & src,
00085 Block <Vector<Int> > & to) const;
00086
00087 private:
00088
00089 Block< Vector<Int> > blockNGroup_p;
00090 Block< Vector<Int> > blockStart_p;
00091 Block< Vector<Int> > blockWidth_p;
00092 Block< Vector<Int> > blockIncr_p;
00093 Block< Vector<Int> > blockSpw_p;
00094 };
00095
00096
00097 class SelectChannelModifier : public RoviaModifier {
00098
00099 public:
00100
00101 SelectChannelModifier (Int nGroup, Int start, Int width, Int increment, Int spectralWindow);
00102 SelectChannelModifier (const Block< Vector<Int> > & blockNGroup,
00103 const Block< Vector<Int> > & blockStart,
00104 const Block< Vector<Int> > & blockWidth,
00105 const Block< Vector<Int> > & blockIncr,
00106 const Block< Vector<Int> > & blockSpw);
00107
00108 void apply (ROVisibilityIterator *) const;
00109
00110 private:
00111
00112 Bool channelBlocks_p;
00113 ChannelSelection channelSelection_p;
00114 Int increment_p;
00115 Int nGroup_p;
00116 Int spectralWindow_p;
00117 Int start_p;
00118 Int width_p;
00119
00120 void print (std::ostream & o) const;
00121 String toCsv (const Block< Vector<Int> > & bv) const;
00122 String toCsv (const Vector<Int> & v) const;
00123
00124 };
00125
00126 class SetIntervalModifier : public RoviaModifier {
00127
00128 public:
00129
00130 SetIntervalModifier (Double timeInterval);
00131 void apply (ROVisibilityIterator *) const;
00132
00133 private:
00134
00135 Double timeInterval_p;
00136
00137 void print (std::ostream & o) const;
00138 };
00139
00140
00141 class SetRowBlockingModifier : public RoviaModifier {
00142
00143 public:
00144
00145 SetRowBlockingModifier (Int nRows);
00146 void apply (ROVisibilityIterator *) const;
00147
00148 private:
00149
00150 Int nRows_p;
00151 Int nGroup_p;
00152 Int spectralWindow_p;
00153 Int start_p;
00154 Int width_p;
00155
00156 void print (std::ostream & o) const;
00157 };
00158
00159 class RoviaModifiers {
00160
00161 public:
00162
00163 ~RoviaModifiers ();
00164
00165 void add (RoviaModifier *);
00166 void apply (ROVisibilityIterator *);
00167 void clearAndFree ();
00168 RoviaModifiers transferModifiers ();
00169
00170 private:
00171
00172 typedef std::vector<RoviaModifier *> Data;
00173 Data data_p;
00174
00175 };
00176
00177 class SelectVelocityModifier : public RoviaModifier {
00178
00179 public:
00180
00181 SelectVelocityModifier (Int nChan, const MVRadialVelocity& vStart, const MVRadialVelocity& vInc,
00182 MRadialVelocity::Types rvType, MDoppler::Types dType, Bool precise);
00183 void apply (ROVisibilityIterator *) const;
00184
00185 private:
00186
00187 MDoppler::Types dType_p;
00188 Int nChan_p;
00189 Bool precise_p;
00190 MRadialVelocity::Types rvType_p;
00191 MVRadialVelocity vInc_p;
00192 MVRadialVelocity vStart_p;
00193
00194 virtual void print (std::ostream & o) const;
00195
00196 };
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252 class VlaDatum {
00253
00254 public:
00255
00256 typedef enum {Empty, Filling, Full, Reading} State;
00257
00258 VlaDatum (SubChunkPair);
00259 ~VlaDatum ();
00260
00261 SubChunkPair getSubChunkPair () const;
00262 VisBufferAsync * getVisBuffer ();
00263
00264 Bool isSubChunk (SubChunkPair) const;
00265
00266 VisBufferAsync * releaseVisBufferAsync ();
00267 void reset ();
00268
00269 protected:
00270
00271 private:
00272
00273 SubChunkPair subchunk_p;
00274 VisBufferAsync * visBuffer_p;
00275
00276
00277
00278 VlaDatum & operator= (const VlaDatum & other);
00279
00280 };
00281
00282 class VLAT;
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366 class AsynchronousInterface;
00367 class InterfaceController;
00368
00369 class VlaData {
00370
00371 public:
00372
00373 VlaData (Int maxNBuffers, async::Mutex & mutex);
00374 ~VlaData ();
00375
00376 Bool fillCanStart () const;
00377 void fillComplete (VlaDatum * datum);
00378 VlaDatum * fillStart (SubChunkPair, const ThreadTimes & fillStartTime);
00379 asyncio::ChannelSelection getChannelSelection () const;
00380 void initialize (const AsynchronousInterface *);
00381 void insertValidChunk (Int chunkNumber);
00382 void insertValidSubChunk (SubChunkPair);
00383 Bool isValidChunk (Int chunkNumber) const;
00384 Bool isValidSubChunk (SubChunkPair) const;
00385 void readComplete (SubChunkPair);
00386 VisBufferAsync * readStart (SubChunkPair);
00387 void resetBufferData ();
00388 void setNoMoreData ();
00389 void storeChannelSelection (const asyncio::ChannelSelection & channelSelection);
00390
00391
00392
00393
00394
00395
00396
00397
00398 protected:
00399
00400 private:
00401
00402 typedef std::queue<VlaDatum *> Data;
00403 typedef std::queue<Int> ValidChunks;
00404 typedef std::queue<SubChunkPair> ValidSubChunks;
00405
00406 class Timing {
00407 public:
00408 ThreadTimes fill1_p;
00409 ThreadTimes fill2_p;
00410 ThreadTimes fill3_p;
00411 DeltaThreadTimes fillCycle_p;
00412 DeltaThreadTimes fillOperate_p;
00413 DeltaThreadTimes fillWait_p;
00414 ThreadTimes read1_p;
00415 ThreadTimes read2_p;
00416 ThreadTimes read3_p;
00417 DeltaThreadTimes readCycle_p;
00418 DeltaThreadTimes readOperate_p;
00419 DeltaThreadTimes readWait_p;
00420 ThreadTimes timeStart_p;
00421 ThreadTimes timeStop_p;
00422 };
00423
00424 asyncio::ChannelSelection channelSelection_p;
00425 Data data_p;
00426 const AsynchronousInterface * interface_p;
00427 const Int MaxNBuffers_p;
00428 async::Mutex & mutex_p;
00429 Timing timing_p;
00430 mutable ValidChunks validChunks_p;
00431 mutable ValidSubChunks validSubChunks_p;
00432
00433
00434 Int clock (Int arg, Int base);
00435 String makeReport ();
00436
00437 Bool statsEnabled () const;
00438 void terminateSweep ();
00439
00441
00442 static Bool initializeLogging ();
00443
00444
00445
00446 VlaData (const VlaData & other);
00447 VlaData & operator= (const VlaData & other);
00448 };
00449
00450 class WriteData {
00451
00452 public:
00453
00454 WriteData (const SubChunkPair & subchunkPair) : subchunkPair_p (subchunkPair) {}
00455
00456 virtual ~WriteData () {}
00457
00458 SubChunkPair getSubChunkPair () const { return subchunkPair_p;}
00459 virtual void write (VisibilityIterator * vi) = 0;
00460
00461 private:
00462
00463 SubChunkPair subchunkPair_p;
00464
00465 };
00466
00467 template <typename Data>
00468 class WriteDataImpl : public WriteData {
00469 public:
00470
00471 typedef void (VisibilityIterator::* Setter) (const Data &);
00472
00473 WriteDataImpl (const SubChunkPair & subchunkPair,
00474 const Data & data,
00475 Setter setter)
00476 : WriteData (subchunkPair),
00477 data_p (),
00478 setter_p (setter)
00479 {
00480 data_p.assign (data);
00481 }
00482
00483 void
00484 write (VisibilityIterator * vi)
00485 {
00486 (vi ->* setter_p) (data_p);
00487 }
00488
00489 private:
00490
00491 Data data_p;
00492 Setter setter_p;
00493
00494 };
00495
00496 template <typename Data>
00497 WriteData *
00498 createWriteData (const SubChunkPair & subchunkPair,
00499 const Data & data,
00500 void (VisibilityIterator::* setter) (const Data &))
00501 {
00502 return new WriteDataImpl<Data> (subchunkPair, data, setter);
00503 }
00504
00505 template <typename Data>
00506 class WriteDataImpl2 : public WriteData {
00507 public:
00508
00509 typedef ROVisibilityIterator::DataColumn DataColumn;
00510 typedef void (VisibilityIterator::* Setter) (const Data &, DataColumn);
00511
00512 WriteDataImpl2 (const SubChunkPair & subchunkPair,
00513 const Data & data,
00514 DataColumn dataColumn,
00515 Setter setter)
00516 : WriteData (subchunkPair),
00517 data_p (),
00518 dataColumn_p (dataColumn),
00519 setter_p (setter)
00520 {
00521 data_p.assign (data);
00522 }
00523
00524 void
00525 write (VisibilityIterator * vi)
00526 {
00527 (vi ->* setter_p) (data_p, dataColumn_p);
00528 }
00529
00530 private:
00531
00532 Data data_p;
00533 DataColumn dataColumn_p;
00534 Setter setter_p;
00535 };
00536
00537 template <typename Data>
00538 WriteData *
00539 createWriteData (const SubChunkPair & subchunkPair,
00540 const Data & data,
00541 ROVisibilityIterator::DataColumn dataColumn,
00542 void (VisibilityIterator::* setter) (const Data &, ROVisibilityIterator::DataColumn))
00543 {
00544 return new WriteDataImpl2 <Data> (subchunkPair, data, dataColumn, setter);
00545 }
00546
00547 class AsynchronousInterface;
00548
00549 class WriteQueue {
00550
00551 public:
00552
00553 WriteQueue ();
00554 ~WriteQueue ();
00555
00556 WriteData * dequeue ();
00557 Bool empty (Bool alreadyLocked = False);
00558 void enqueue (WriteData * writeData);
00559
00560 void initialize (const AsynchronousInterface *);
00561
00562 void write (VisibilityIterator * vi);
00563
00564 private:
00565
00566 const AsynchronousInterface * interface_p;
00567 async::Mutex mutex_p;
00568 std::queue<WriteData *> queue_p;
00569 };
00570
00571
00572 class AsynchronousInterface {
00573
00574
00575
00576 public:
00577
00578
00579 AsynchronousInterface( const AsynchronousInterface& ) = delete;
00580 AsynchronousInterface& operator=( const AsynchronousInterface& ) = delete;
00581
00582 AsynchronousInterface (int maxNBuffers);
00583 ~AsynchronousInterface ();
00584
00585 void addModifier (asyncio::RoviaModifier * modifier);
00586 async::Mutex & getMutex () const;
00587
00588 VlaData * getVlaData ();
00589 VLAT * getVlat ();
00590 WriteQueue & getWriteQueue ();
00591 void initialize ();
00592 Bool isSweepTerminationRequested () const;
00593 Bool isLookaheadTerminationRequested () const;
00594 void notifyAllInterfaceChanged () const;
00595 void requestViReset ();
00596 pair<Bool, RoviaModifiers> resetVi ();
00597 void terminate ();
00598 void terminateLookahead ();
00599 void terminateSweep ();
00600 RoviaModifiers transferRoviaModifiers ();
00601 void viResetComplete ();
00602 Bool viResetRequested ();
00603 void waitForInterfaceChange (async::UniqueLock & uniqueLock) const;
00604
00605 static Bool initializeLogging ();
00606 static Bool logThis (Int level);
00607
00608 private:
00609
00610 mutable async::Condition interfaceDataChanged_p;
00611
00612
00613
00614 volatile Bool lookaheadTerminationRequested_p;
00615 mutable async::Mutex mutex_p;
00616 asyncio::RoviaModifiers roviaModifiers_p;
00617 volatile Bool sweepTerminationRequested_p;
00618
00619 volatile Bool viResetComplete_p;
00620 volatile Bool viResetRequested_p;
00621 VlaData vlaData_p;
00622 VLAT * vlat_p;
00623 WriteQueue writeQueue_p;
00624
00625 static Bool loggingInitialized_p;
00626 static Int logLevel_p;
00627 };
00628
00629 }
00630
00631 }
00632
00633 #endif