00001
00002
00003
00004
00005
00006
00007
00008 #ifndef VISIBILITYPROCESSING_H_
00009 #define VISIBILITYPROCESSING_H_
00010
00011 #include <casa/aips.h>
00012 #include <casa/BasicSL/String.h>
00013 #include <casa/Exceptions/Error.h>
00014 #include "VisBuffer.h"
00015 #include "VisibilityIterator.h"
00016 #include "UtilJ.h"
00017
00018 #include <memory>
00019 #include <tuple>
00020 #include <map>
00021 #include <set>
00022 #include <vector>
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 namespace casa {
00057
00058 namespace asyncio {
00059 class PrefetchColumns;
00060 };
00061
00062 namespace vpf {
00063
00064 class VisibilityProcessor;
00065 class VpContainer;
00066 class VpEngine;
00067
00068 class SubchunkIndex {
00069
00070 friend class SubchunkIndex_Test;
00071
00072 public:
00073
00074 enum {Invalid = -1};
00075
00076 SubchunkIndex (Int chunkNumber = Invalid, Int subChunkNumber = Invalid, Int iteration = Invalid);
00077
00078
00079
00080
00081
00082 Bool operator< (const SubchunkIndex & other) const;
00083 Bool operator== (const SubchunkIndex & other) const { return ! (* this < other || other < * this);}
00084 Bool operator!= (const SubchunkIndex & other) const { return ! (* this == other);}
00085
00086 Int getChunkNumber () const;
00087 Int getIteration () const;
00088 Int getSubchunkNumber () const;
00089
00090 String toString () const;
00091
00092 private:
00093
00094 Int chunkNumber_p;
00095 Int iteration_p;
00096 Int subChunkNumber_p;
00097 };
00098
00099 class VbPtr : public std::shared_ptr<casa::VisBuffer> {
00100
00101 public:
00102
00103 VbPtr () : std::shared_ptr<casa::VisBuffer> () {}
00104 explicit VbPtr (casa::VisBuffer * vb) : std::shared_ptr<casa::VisBuffer> (vb) {}
00105
00106
00107
00108
00109 VbPtr & operator= (casa::VisBuffer * vb)
00110 {
00111 std::shared_ptr<casa::VisBuffer>::operator= (VbPtr (vb));
00112 return * this;
00113 }
00114 };
00115
00116 class VpPort {
00117
00118 friend class VpContainer;
00119 friend class VpPort_Test;
00120
00121 public:
00122
00123
00124
00125
00126
00127 typedef enum {Unknown, Input = 1, Output = 2, InOut = Input | Output} Type;
00128
00129 VpPort ();
00130 VpPort (VisibilityProcessor * vp, const String & name, Type type);
00131 ~VpPort () {}
00132
00133 Bool operator< (const VpPort & other) const;
00134 Bool operator== (const VpPort & other) const;
00135
00136 Bool empty () const;
00137 String getFullName () const;
00138 String getName () const;
00139 Type getType () const;
00140 Bool isConnectedInput () const;
00141 Bool isConnectedOutput () const;
00142
00143
00144
00145
00146 bool isType (Type t) const;
00147
00148
00149
00150 protected:
00151
00152 const VisibilityProcessor * getVp () const;
00153 VisibilityProcessor * getVp ();
00154 void setConnectedInput ();
00155 void setConnectedOutput ();
00156
00157 private:
00158
00159 Bool connectedInput_p;
00160 Bool connectedOutput_p;
00161 String name_p;
00162 VisibilityProcessor * visibilityProcessor_p;
00163 Type type_p;
00164
00165 };
00166
00167 class VpPorts : public std::vector<VpPort> {
00168
00169 friend class VisibilityProcessor;
00170 friend class VpPorts_Test;
00171
00172 public:
00173
00174 Bool contains (const String & name) const;
00175 Bool contains (const VpPort & port) const;
00176 VpPort get (const String & name) const;
00177 String toString () const;
00178
00179 protected:
00180
00181 VpPort & getRef (const String & name);
00182
00183 template <typename Itr>
00184 static
00185 Itr
00186 find(const String & name, Itr begin, Itr end)
00187 {
00188 Itr i;
00189
00190 for (i = begin; i != end; i++){
00191 if (i->getName() == name){
00192 break;
00193 }
00194 }
00195
00196 return i;
00197 }
00198
00199 };
00200
00201 namespace asyncio {
00202 class PrefetchColumns;
00203 }
00204
00205
00206 class VpData: public std::map<VpPort, VbPtr> {
00207
00208 friend class VpData_Test;
00209
00210 public:
00211
00212 VpData ();
00213 VpData (const VpPort & port, VbPtr);
00214
00215 void add (const VpPort & port, VbPtr);
00216
00217
00218
00219
00220 VpData getSelection (const VpPorts &, bool missingIsOk = False) const;
00221 String getNames () const;
00222 };
00223
00224
00225 class VisibilityProcessor {
00226
00227 friend class VpContainer;
00228 friend class WriterVp;
00229
00230 public:
00231
00232 VisibilityProcessor( const VisibilityProcessor& ) = delete;
00233 VisibilityProcessor& operator=( const VisibilityProcessor& ) = delete;
00234
00235 typedef enum {
00236 Normal,
00237 RepeatChunk
00238 } ChunkCode;
00239
00240 typedef enum {
00241 Subchunk,
00242 EndOfChunk,
00243 EndOfData
00244 } ProcessingType;
00245
00246 typedef std::tuple <ChunkCode, VpData> ProcessingResult;
00247
00248 VisibilityProcessor ();
00249 VisibilityProcessor (const String & name,
00250 const vector<String> & inputNames,
00251 const vector<String> & outputNames = vector<String>(),
00252 Bool makeIoPorts = False);
00253 virtual ~VisibilityProcessor () {}
00254
00255
00256
00257 void chunkStart (const SubchunkIndex &);
00258
00259
00260
00261
00262 ProcessingResult doProcessing (ProcessingType processingType,
00263 VpData & inputData,
00264 VpEngine * vpEngine,
00265 const SubchunkIndex & subChunkIndex);
00266
00267
00268
00269 const VpContainer * getContainer () const { return NULL;}
00270
00271
00272
00273
00274 String getFullName () const;
00275
00276
00277
00278 VpPort getInput (const String & name) const;
00279
00280
00281
00282
00283 VpPorts getInputs (Bool connectedOnly = False) const;
00284
00285
00286
00287 String getName () const;
00288
00289
00290
00291 Int getNSubchunksProcessed () const;
00292
00293
00294
00295
00296 Int getNSubchunksUniqueProcessed () const;
00297
00298
00299
00300 VpPort getOutput (const String & name) const;
00301
00302
00303
00304
00305 VpPorts getOutputs (Bool connectedOnly = False) const;
00306
00307
00308
00309
00310 virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const;
00311
00312
00313
00314
00315 void processingStart ();
00316
00317
00318
00319
00320 void validate ();
00321
00322 protected:
00323
00324
00325
00326
00327
00328
00329
00330
00331 virtual void chunkStartImpl (const SubchunkIndex &) {}
00332
00333
00334
00335
00336 VpPorts definePorts (const vector<String> & portNames, VpPort::Type type, const String & typeName);
00337
00338
00339
00340
00341
00342
00343 virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
00344 VpData & inputData,
00345 const SubchunkIndex & subChunkIndex) = 0;
00346
00347
00348
00349
00350 VpPorts portsUnconnected (const VpPorts & ports, Bool (VpPort::* isConnected) () const,
00351 const vector<String> & except = vector<String> ()) const;
00352
00353
00354
00355
00356 virtual void processingStartImpl () {}
00357
00358
00359
00360 void throwIfAnyInputsUnconnected (const vector<String> & exceptThese = vector<String> ()) const;
00361 void throwIfAnyInputsUnconnectedExcept (const String & exceptThisOne) const;
00362 void throwIfAnyOutputsUnconnected (const vector<String> & exceptThese = vector<String> ()) const;
00363 void throwIfAnyOutputsUnconnectedExcept (const String & exceptThisOne) const;
00364 void throwIfAnyPortsUnconnected () const;
00365
00366
00367
00368
00369 virtual void validateImpl () = 0;
00370
00371 private:
00372
00373 VpPort & getInputRef (const String & name);
00374 VpPort & getOutputRef (const String & name);
00375 void setContainer (const VpContainer *);
00376
00377 ROVisibilityIterator * getVi ();
00378 VpEngine * getVpEngine();
00379
00380 const VpContainer * container_p;
00381 String name_p;
00382 Int nSubchunks_p;
00383 Int nSubchunksUnique_p;
00384 VpEngine * vpEngine_p;
00385 VpPorts vpInputs_p;
00386 VpPorts vpOutputs_p;
00387 };
00388
00389 ostream & operator<< (ostream & os, const VisibilityProcessor::ProcessingType & processingType);
00390 String toString (VisibilityProcessor::ProcessingType p);
00391
00392 class VisibilityProcessorStub : public VisibilityProcessor {
00393
00394
00395
00396
00397 public:
00398
00399 VisibilityProcessorStub (const String & name)
00400 : VisibilityProcessor (name, utilj::Strings(), utilj::Strings())
00401 {}
00402
00403 ProcessingResult doProcessingImpl (ProcessingType ,
00404 VpData & ,
00405 const SubchunkIndex & );
00406
00407 void validateImpl ();
00408
00409
00410 };
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432 class SplitterVp : public VisibilityProcessor {
00433
00434 public:
00435
00436 SplitterVp (const String & name,
00437 const String & inputName,
00438 const vector<String> & outputNames);
00439
00440 ~SplitterVp () {}
00441
00442 protected:
00443
00444 ProcessingResult doProcessingImpl (ProcessingType processingType ,
00445 VpData & inputData,
00446 const SubchunkIndex & subChunkIndex);
00447
00448 void validateImpl ();
00449 };
00450
00451 class WriterVp: public VisibilityProcessor {
00452
00453 public:
00454
00455
00456
00457
00458
00459
00460
00461 WriterVp (const String & name,
00462 VisibilityIterator * vi = NULL,
00463 Bool advanceVi = False,
00464 const String & input = "In",
00465 const String & output = "Out");
00466
00467
00468
00469
00470
00471 Bool setDisableOutput (Bool disableIt);
00472
00473 protected:
00474
00475 ProcessingResult doProcessingImpl (ProcessingType processingType,
00476 VpData & inputData,
00477 const SubchunkIndex & subChunkIndex);
00478
00479 void validateImpl ();
00480
00481 private:
00482
00483 Bool advanceVi_p;
00484
00485 Bool disableOutput_p;
00486 VisibilityIterator * vi_p;
00487 };
00488
00489 class VpContainer : public VisibilityProcessor {
00490
00491 friend class VisibilityProcessing;
00492
00493 public:
00494
00495
00496
00497
00498
00499 VpContainer (const String & name,
00500 const vector<String> & inputs = vector<String> (1, "In"),
00501 const vector<String> & outputs = vector<String> ());
00502
00503 virtual ~VpContainer () {}
00504
00505
00506
00507 virtual void add (VisibilityProcessor * processor);
00508
00509
00510
00511
00512 virtual void connect (VisibilityProcessor * sourceVp, const String & sourcePortName,
00513 VisibilityProcessor * sinkVp, const String & sinkPortName);
00514 virtual void connect (const String & sourcePortName,
00515 VisibilityProcessor * sinkVp, const String & sinkPortName);
00516 virtual void connect (VisibilityProcessor * sourceVp, const String & sourcePortName,
00517 const String & sinkPortName);
00518
00519 virtual void chunkStart (const SubchunkIndex & sci);
00520
00521
00522
00523
00524 virtual void fillWithSequence (VisibilityProcessor * first, ...);
00525
00526
00527
00528 virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const;
00529
00530 protected:
00531
00532 typedef vector<VisibilityProcessor *> VPs;
00533 typedef VPs::const_iterator const_iterator;
00534 typedef VPs::iterator iterator;
00535
00536 iterator begin();
00537 const_iterator begin() const;
00538
00539 Bool contains (const VisibilityProcessor *) const;
00540 virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
00541 VpData & inputData,
00542 const SubchunkIndex & subChunkIndex);
00543 Bool empty () const;
00544 iterator end();
00545 const_iterator end() const;
00546 virtual void processingStartImpl ();
00547 size_t size() const;
00548 virtual void validateImpl ();
00549
00550 private:
00551
00552 typedef std::map<VpPort, VpPort> Network;
00553 typedef std::set<VpPort> NetworkReverse;
00554 typedef std::tuple<VisibilityProcessor *, VpData> ReadyVpAndData;
00555
00556 class VpSet : public std::set<VisibilityProcessor *> {
00557 public:
00558
00559 template <typename In>
00560 VpSet (In begin, In end) : std::set<VisibilityProcessor *> (begin, end) {}
00561 String getNames () const;
00562 };
00563
00564 Network network_p;
00565 NetworkReverse networkReverse_p;
00566
00567 VPs vps_p;
00568
00569 ReadyVpAndData findReadyVp (VpSet & vpsWaiting, VpData & inputs, bool flushing) const;
00570 ReadyVpAndData findReadyVpFlushing (VpSet & vpsWaiting, VpData & inputs) const;
00571 ReadyVpAndData findReadyVpNormal (VpSet & vpsWaiting, VpData & inputs) const;
00572 bool follows (const VisibilityProcessor * a, const VisibilityProcessor * b) const;
00573 bool followsSet (const VisibilityProcessor * a, const VpSet & vpSet) const;
00574 void orderContents ();
00575 void remapPorts (VpData & data, const VisibilityProcessor *);
00576 pair<VpPort,VpPort> validateConnectionPorts (VisibilityProcessor * sourceVp,
00577 const String & sourcePortName,
00578 VisibilityProcessor * sinkVp,
00579 const String & sinkPortName);
00580 };
00581
00582 class VpEngine {
00583
00584 friend class VisibilityProcessor;
00585
00586 public:
00587
00588 VpEngine () : vi_p (NULL) {}
00589
00590
00591
00592
00593
00594 void process (VisibilityProcessor & processor,
00595 ROVisibilityIterator & vi,
00596 const String & inputPortName);
00597
00598 void process (VisibilityProcessor & processor,
00599 ROVisibilityIterator & vi,
00600 const VpPort & inputPort = VpPort ());
00601
00602 static Int getLogLevel ();
00603 static void log (const String & format, ...);
00604 static String getAipsRcBase ();
00605
00606 private:
00607
00608 ROVisibilityIterator * vi_p;
00609
00610 static Int logLevel_p;
00611 static LogIO * logIo_p;
00612 static Bool loggingInitialized_p;
00613 static LogSink * logSink_p;
00614
00615 static Bool initializeLogging ();
00616
00617 ROVisibilityIterator * getVi ();
00618
00619 };
00620
00621 }
00622
00623 }
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653 #endif