VisibilityProcessing.h

Go to the documentation of this file.
00001 /*
00002  * VisibilityProcessing.h
00003  *
00004  *  Created on: Feb 8, 2011
00005  *      Author: jjacobs
00006  */
00007 
00008 #ifndef VISIBILITYPROCESSING_H_
00009 #define VISIBILITYPROCESSING_H_
00010 
00011 #include <casa/aips.h>
00012 #include <casa/BasicSL/String.h>
00013 #include <casa/Exceptions/Error.h>
00014 #include "VisBuffer.h"
00015 #include "VisibilityIterator.h"
00016 #include "UtilJ.h"
00017 
00018 #include <memory>
00019 #include <tuple>
00020 #include <map>
00021 #include <set>
00022 #include <vector>
00023 
00024 
00025 /*
00026 
00027 Visibility Processing Framework Class Summary
00028 =============================================
00029 
00030 SubchunkIndex - Index of a subchunk.  Consists of the chunk number,
00031                 the subchunk number and the iteration number.  All three
00032                 are zero-based.  The interation number is nonzero if a chunk
00033                 is reprocessed. A subchunk is used to identify a VisBuffer
00034                 relative to the VisibilityIterator managed by the VpEngine.
00035 VbPtr - Smart pointer of a VisBuffer.
00036 VisibilityProcessor - A visibility processing node in data flow graph
00037     VisibilityProcessorStub - A do-nothing node used for unit testing
00038     VpContainer - A VP which contains a graph of VPs.  It handles moving
00039                   data between its input and output ports to the appropriate
00040                   input and output ports of the data flow graph it contains.
00041     SplitterVp - Has a single input port and outputs identical copies of it
00042                  through its output ports.
00043     WriterVp - Takes an input and writes it out to the VisibilityIterator provided
00044                when it was constructed.  Optionally passes the input data to its
00045                output port.
00046 VpEngine - Object that executes a data flow graph of VisibilityProcessors on data
00047            accessed via a VisibilityIterator.
00048 
00049 VpPort - A data port into or out of (or both) a VisibiltyProcessor
00050 VpPorts - A collection of VpPort objects
00051 VpData - A collection of visibility data; it works like an associative array
00052          pairing a VpPort with a VisBuffer.
00053 
00054 */
00055 
00056 namespace casa {
00057 
00058 namespace asyncio {
00059     class PrefetchColumns;
00060 };
00061 
00062 namespace vpf {
00063 
00064 class VisibilityProcessor;
00065 class VpContainer;
00066 class VpEngine;
00067 
00068 class SubchunkIndex {
00069 
00070     friend class SubchunkIndex_Test;
00071 
00072 public:
00073 
00074     enum {Invalid = -1};
00075 
00076     SubchunkIndex (Int chunkNumber = Invalid, Int subChunkNumber = Invalid, Int iteration = Invalid);
00077 
00078     // Comparison Operators
00079     //
00080     // Comparison is in lexicographic order by chunk, subchunk and iteration.
00081 
00082     Bool operator< (const SubchunkIndex & other) const;
00083     Bool operator== (const SubchunkIndex & other) const { return ! (* this < other || other < * this);}
00084     Bool operator!= (const SubchunkIndex & other) const { return ! (* this == other);}
00085 
00086     Int getChunkNumber () const;
00087     Int getIteration () const;
00088     Int getSubchunkNumber () const;
00089 
00090     String toString () const;
00091 
00092 private:
00093 
00094     Int chunkNumber_p;        // -1 for invalid
00095     Int iteration_p;          // -1 for invalid
00096     Int subChunkNumber_p;
00097 };
00098 
00099 class VbPtr : public std::shared_ptr<casa::VisBuffer> {
00100 
00101 public:
00102 
00103     VbPtr () : std::shared_ptr<casa::VisBuffer> () {}
00104     explicit VbPtr (casa::VisBuffer * vb) : std::shared_ptr<casa::VisBuffer> (vb) {}
00105 
00106     // Assignment operator setting VbPtr to a normal pointer.  Ownership is passed to the
00107     // VbPtr so caller must ensure that delete is not called on the VisBuffer.
00108 
00109     VbPtr & operator= (casa::VisBuffer * vb)
00110     {
00111         std::shared_ptr<casa::VisBuffer>::operator= (VbPtr (vb));
00112         return * this;
00113     }
00114 };
00115 
00116 class VpPort {
00117 
00118     friend class VpContainer;
00119     friend class VpPort_Test;
00120 
00121 public:
00122 
00123     // Normally ports are either input or output ports.  However, the ports
00124     // of a VpContainer do double duty serving as an input to the container and
00125     // an outputted to the input of a contained VP, or vice versa.
00126 
00127     typedef enum {Unknown, Input = 1, Output = 2, InOut = Input | Output} Type;
00128 
00129     VpPort ();
00130     VpPort (VisibilityProcessor * vp, const String & name, Type type);
00131     ~VpPort () {}
00132 
00133     Bool operator< (const VpPort & other) const;
00134     Bool operator== (const VpPort & other) const;
00135 
00136     Bool empty () const;
00137     String getFullName () const; // returns Vp0.Vp1...VpN.portName
00138     String getName () const; // returns portName
00139     Type getType () const; // Returns the port's type as something from the Type enum
00140     Bool isConnectedInput () const; // True if port has been connected up as an input
00141     Bool isConnectedOutput () const; // True if port has been connected up as an output
00142 
00143     // Used to check the type of the port as defined in the Type enum.  InOut ports
00144     // return true for both Input and Output types.
00145 
00146     bool isType (Type t) const;
00147 
00148     //String toString() const;
00149 
00150 protected:
00151 
00152     const VisibilityProcessor * getVp () const;
00153     VisibilityProcessor * getVp ();
00154     void setConnectedInput ();
00155     void setConnectedOutput ();
00156 
00157 private:
00158 
00159     Bool connectedInput_p;
00160     Bool connectedOutput_p;
00161     String name_p;
00162     VisibilityProcessor * visibilityProcessor_p; // [use]
00163     Type type_p;
00164 
00165 };
00166 
00167 class VpPorts : public std::vector<VpPort> {
00168 
00169     friend class VisibilityProcessor;
00170     friend class VpPorts_Test;
00171 
00172 public:
00173 
00174     Bool contains (const String & name) const;
00175     Bool contains (const VpPort & port) const;
00176     VpPort get (const String & name) const;
00177     String toString () const;
00178 
00179 protected:
00180 
00181     VpPort & getRef (const String & name);
00182 
00183     template <typename Itr>
00184     static
00185     Itr
00186     find(const String & name, Itr begin, Itr end)
00187     {
00188         Itr i;
00189 
00190         for (i = begin; i != end; i++){
00191             if (i->getName() == name){
00192                 break;
00193             }
00194         }
00195 
00196         return i;
00197     }
00198 
00199 };
00200 
00201 namespace asyncio {
00202     class PrefetchColumns;
00203 }
00204 
00205 
00206 class VpData: public std::map<VpPort, VbPtr> {
00207 
00208     friend class VpData_Test;
00209 
00210 public:
00211 
00212     VpData ();
00213     VpData (const VpPort & port, VbPtr);
00214 
00215     void add (const VpPort & port, VbPtr); // Adds a (port,VbPtr) to the collection
00216 
00217     // Returns the (port,VbPtr) pairs for the requested set of ports.  An execption
00218     // is thrown if a requested port is not present unless missingIsOk is set to True.
00219 
00220     VpData getSelection (const VpPorts &, bool missingIsOk = False) const;
00221     String getNames () const; // Returns a comma-separated list of the port names.
00222 };
00223 
00224 
00225 class VisibilityProcessor {
00226 
00227     friend class VpContainer;
00228     friend class WriterVp;
00229 
00230 public:
00231 
00232     VisibilityProcessor( const VisibilityProcessor& ) = delete;
00233     VisibilityProcessor& operator=( const VisibilityProcessor& ) = delete;
00234 
00235     typedef enum {
00236         Normal,
00237         RepeatChunk
00238     } ChunkCode;
00239 
00240     typedef enum {
00241         Subchunk,    // Normal processing of a subchunk
00242         EndOfChunk,  // Called after all subchunks of a chunk have been processed
00243         EndOfData    // Called after all chunks have been processed
00244     } ProcessingType;
00245 
00246     typedef std::tuple <ChunkCode, VpData> ProcessingResult;
00247 
00248     VisibilityProcessor ();
00249     VisibilityProcessor (const String & name,
00250                          const vector<String> & inputNames,
00251                          const vector<String> & outputNames = vector<String>(),
00252                          Bool makeIoPorts = False);
00253     virtual ~VisibilityProcessor () {}
00254 
00255     // chunkStart is called to inform the VP that a new chunk is starting.
00256 
00257     void chunkStart (const SubchunkIndex &);
00258 
00259     // Called to cause the VP to process the provided inputs.  It will be called
00260     // in three different contexts as indicated by the ProcessingType.
00261 
00262     ProcessingResult doProcessing (ProcessingType processingType,
00263                                    VpData & inputData,
00264                                    VpEngine * vpEngine,
00265                                    const SubchunkIndex & subChunkIndex);
00266 
00267     // Returns a pointer to the containing VP or NULL if this VP is top-level.
00268 
00269     const VpContainer * getContainer () const { return NULL;}
00270 
00271     // The full name of a VP is a dotted list of the names of all the containing
00272     // VPs ending with the name of this VP (e.g., vp0.vp1...vpN.thisVp).
00273 
00274     String getFullName () const;
00275 
00276     // Returns the input port having the specified name.  Exception if port is undefined.
00277 
00278     VpPort getInput (const String & name) const;
00279 
00280     // Returns a collection of the input ports for this VP; optionally only the
00281     // connected ports are returned.
00282 
00283     VpPorts getInputs (Bool connectedOnly = False) const;
00284 
00285     // Returns the name of this VP
00286 
00287     String getName () const;
00288 
00289     // Returns the number of Subchunks processed (mainly for testing)
00290 
00291     Int getNSubchunksProcessed () const;
00292 
00293     // Returns the number of unique Subchunks (i.e., iteration ignored) processed.
00294     // (mainly for testing)
00295 
00296     Int getNSubchunksUniqueProcessed () const;
00297 
00298     // Returns the output port having the specified name.  Exception if port is undefined.
00299 
00300     VpPort getOutput (const String & name) const;
00301 
00302     // Returns a collection of the output ports for this VP; optionally only the
00303     // connected ports are returned.
00304 
00305     VpPorts getOutputs (Bool connectedOnly = False) const;
00306 
00307     // Returns the collection of columns that need to be prefetched if this node
00308     // is used with async I/O.
00309 
00310     virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const;
00311 
00312     // Called by the framework when the processing is about to begin (i.e., prior
00313     // to the first VisBuffer being fed into the graph.
00314 
00315     void processingStart ();
00316 
00317     // Called to ask the VP to check its validity (i.e., are all needed inputs connected,
00318     // etc.).
00319 
00320     void validate ();
00321 
00322 protected:
00323 
00324     // The public API contains many methods that are not virtual.  However, where subclass-specific
00325     // behavior is potentially useful, a corresponding xxxImpl method is provided.  This allows the
00326     // framework to perform certain required housekeeping options while allowing the subclass
00327     // to perform custom operations.
00328 
00329     // Called on the object when a new chunk is about to be started.
00330 
00331     virtual void chunkStartImpl (const SubchunkIndex &) {}
00332 
00333 
00334     // Defines the set of possible input ports for this VP
00335 
00336     VpPorts definePorts (const vector<String> & portNames, VpPort::Type type, const String & typeName);
00337 
00338     // Requests processing of the provided (possibly empty) input data.  This is called on each
00339     // subchunk (then inputData will be nonempty) and at the end of a chunk and the end of the
00340     // entire data set.  These last two call types allow the VP to output any data that it might have
00341     // been accumulating across multiple subchunks, etc.
00342 
00343     virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
00344                                                VpData & inputData,
00345                                                const SubchunkIndex & subChunkIndex) = 0;
00346 
00347     // Returns a collection of the ports that are not connected using the provided connection
00348     // method; some ports may also be excluded from this list by name.
00349 
00350     VpPorts portsUnconnected (const VpPorts & ports, Bool (VpPort::* isConnected) () const,
00351                               const vector<String> & except = vector<String> ()) const;
00352 
00353     // Called when data processing is about to beging; this allows the VP to perform any
00354     // initialization that it desires now that it is completely connected into the graph.
00355 
00356     virtual void processingStartImpl () {}
00357 
00358     // Methods to ease the validation process.
00359 
00360     void throwIfAnyInputsUnconnected (const vector<String> & exceptThese = vector<String> ()) const;
00361     void throwIfAnyInputsUnconnectedExcept (const String & exceptThisOne) const;
00362     void throwIfAnyOutputsUnconnected (const vector<String> & exceptThese = vector<String> ()) const;
00363     void throwIfAnyOutputsUnconnectedExcept (const String & exceptThisOne) const;
00364     void throwIfAnyPortsUnconnected () const;
00365 
00366     // Called to allow the node to validate its initial state.  An AipsError should be thrown if
00367     // this node decides that it is invalid.
00368 
00369     virtual void validateImpl () = 0;
00370 
00371 private:
00372 
00373     VpPort & getInputRef (const String & name);
00374     VpPort & getOutputRef (const String & name);
00375     void setContainer (const VpContainer *);
00376 
00377     ROVisibilityIterator * getVi (); // returns the VI used for this data set
00378     VpEngine * getVpEngine(); // returns the engine executing this VP
00379 
00380     const VpContainer * container_p; // [use]
00381     String name_p; // name of this VP
00382     Int nSubchunks_p; // number of subchunks processed
00383     Int nSubchunksUnique_p; // number of unique subchunks processed
00384     VpEngine * vpEngine_p; // pointer to VpEngine processing this VP (can be null)
00385     VpPorts vpInputs_p; // collection of input ports
00386     VpPorts vpOutputs_p; // collection of output ports
00387 };
00388 
00389 ostream & operator<< (ostream & os, const VisibilityProcessor::ProcessingType & processingType);
00390 String toString (VisibilityProcessor::ProcessingType p);
00391 
00392 class VisibilityProcessorStub : public VisibilityProcessor {
00393 
00394     // Used to allow definition of a VP variable for use in testing.
00395     // Should never be actually operated on.
00396 
00397 public:
00398 
00399     VisibilityProcessorStub (const String & name)
00400     : VisibilityProcessor (name, utilj::Strings(), utilj::Strings())
00401     {}
00402 
00403     ProcessingResult doProcessingImpl (ProcessingType /*processingType*/,
00404                                        VpData & /*inputData*/,
00405                                        const SubchunkIndex & /*subChunkIndex*/);
00406 
00407     void validateImpl ();
00408 
00409 
00410 };
00411 
00412 //class SimpleVp: public VisibilityProcessor {
00413 //
00414 //public:
00415 //
00416 //    SimpleVp (const String & name, const String & input = "In", const String & output = "");
00417 //    virtual ~SimpleVp ();
00418 //
00419 //protected:
00420 //
00421 //    class SimpleResult : public std::tuple<ChunkCode, VisBuffer *> {};
00422 //
00423 //    virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
00424 //                                               VpData & inputData,
00425 //                                               const SubchunkIndex & subChunkIndex);
00426 //    virtual void validateImpl ();
00427 //
00428 //private:
00429 //
00430 //};
00431 
00432 class SplitterVp : public VisibilityProcessor {
00433 
00434 public:
00435 
00436     SplitterVp (const String & name,
00437                 const String & inputName,
00438                 const vector<String> & outputNames);
00439 
00440     ~SplitterVp () {}
00441 
00442 protected:
00443 
00444     ProcessingResult doProcessingImpl (ProcessingType processingType ,
00445                                        VpData & inputData,
00446                                        const SubchunkIndex & subChunkIndex);
00447 
00448     void validateImpl ();
00449 };
00450 
00451 class WriterVp: public VisibilityProcessor {
00452 
00453 public:
00454 
00455     // Creates a WriterVp node.  If the vi argument is NULL then the
00456     // flow graph's VI is used.  The advanceVi argument is used to
00457     // direct the node to advance the VI after each write (i.e., perform
00458     // a vi++ operation); advancing the flow graph's VI will cause a
00459     // run time exception.
00460 
00461     WriterVp (const String & name,
00462               VisibilityIterator * vi = NULL,
00463               Bool advanceVi = False,
00464               const String & input = "In",
00465               const String & output = "Out");
00466 
00467     // This paradoxical method allows the user to create a single data flow graph
00468     // and then programmatically decide at run time whether data should be actually
00469     // output on this particular run.
00470 
00471     Bool setDisableOutput (Bool disableIt);
00472 
00473 protected:
00474 
00475     ProcessingResult doProcessingImpl (ProcessingType processingType,
00476                                        VpData & inputData,
00477                                        const SubchunkIndex & subChunkIndex);
00478 
00479     void validateImpl ();
00480 
00481 private:
00482 
00483     Bool advanceVi_p; // true is VI is to be advanced after each write.
00484                       // N.B., advancing the flow graphs VI is prohibited
00485     Bool disableOutput_p; // true if output is disabled.
00486     VisibilityIterator * vi_p; // VI to use for output.
00487 };
00488 
00489 class VpContainer : public VisibilityProcessor {
00490 
00491     friend class VisibilityProcessing;
00492 
00493 public:
00494 
00495     // Creates a VpContainer object providing the specified inputs and outputs.
00496     // These inputs and outputs will potentially be connected to the inputs and
00497     // outputs of the VPs that are contained in the container.
00498 
00499     VpContainer (const String & name,
00500                  const vector<String> & inputs = vector<String> (1, "In"),
00501                  const vector<String> & outputs = vector<String> ());
00502 
00503     virtual ~VpContainer () {}
00504 
00505     // Adds a VP to the container.  Exception if VP is already in the container.
00506 
00507     virtual void add (VisibilityProcessor * processor);
00508 
00509     // Connects the specified output to the specified input.  The VP pointer may be
00510     // omitted if the port belongs to the container.
00511 
00512     virtual void connect (VisibilityProcessor * sourceVp, const String &  sourcePortName,
00513                           VisibilityProcessor * sinkVp, const String &  sinkPortName);
00514     virtual void connect (const String &  sourcePortName,
00515                           VisibilityProcessor * sinkVp, const String &  sinkPortName);
00516     virtual void connect (VisibilityProcessor * sourceVp, const String &  sourcePortName,
00517                           const String &  sinkPortName);
00518 
00519     virtual void chunkStart (const SubchunkIndex & sci);
00520 
00521     // Fills the container with the specified set of VPs.  The container must be
00522     // empty prior to this call.
00523 
00524     virtual void fillWithSequence (VisibilityProcessor * first, ...); // Last one NULL
00525 
00526     // Returns the columns that are required to be prefetched if async I/O is used.
00527 
00528     virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const;
00529 
00530 protected:
00531 
00532     typedef vector<VisibilityProcessor *> VPs; // VPs are used (not owned)
00533     typedef VPs::const_iterator const_iterator;
00534     typedef VPs::iterator iterator;
00535 
00536     iterator begin();
00537     const_iterator begin() const;
00538 
00539     Bool contains (const VisibilityProcessor *) const;
00540     virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
00541                                                VpData & inputData,
00542                                                const SubchunkIndex & subChunkIndex);
00543     Bool empty () const;
00544     iterator end();
00545     const_iterator end() const;
00546     virtual void processingStartImpl ();
00547     size_t size() const;
00548     virtual void validateImpl ();
00549 
00550 private:
00551 
00552     typedef std::map<VpPort, VpPort> Network;
00553     typedef std::set<VpPort> NetworkReverse;
00554     typedef std::tuple<VisibilityProcessor *, VpData> ReadyVpAndData;
00555 
00556     class VpSet : public std::set<VisibilityProcessor *> {
00557     public:
00558 
00559         template <typename In>
00560         VpSet (In begin, In end) : std::set<VisibilityProcessor *> (begin, end) {}
00561         String getNames () const;
00562     };
00563 
00564     Network network_p; // connections between the ports of the connected nodes
00565     NetworkReverse networkReverse_p; // connections of contets except indexed in
00566                                      // backwards order.
00567     VPs vps_p; // the VPs contained by this container.
00568 
00569     ReadyVpAndData findReadyVp (VpSet & vpsWaiting, VpData & inputs, bool flushing) const;
00570     ReadyVpAndData findReadyVpFlushing (VpSet & vpsWaiting, VpData & inputs) const;
00571     ReadyVpAndData findReadyVpNormal (VpSet & vpsWaiting, VpData & inputs) const;
00572     bool follows (const VisibilityProcessor * a, const VisibilityProcessor * b) const;
00573     bool followsSet (const VisibilityProcessor * a, const VpSet & vpSet) const;
00574     void orderContents ();
00575     void remapPorts (VpData & data, const VisibilityProcessor *);
00576     pair<VpPort,VpPort> validateConnectionPorts (VisibilityProcessor * sourceVp,
00577                                                  const String &  sourcePortName,
00578                                                  VisibilityProcessor * sinkVp,
00579                                                  const String &  sinkPortName);
00580 };
00581 
00582 class VpEngine {
00583 
00584     friend class VisibilityProcessor;
00585 
00586 public:
00587 
00588     VpEngine () : vi_p (NULL) {}
00589 
00590     // Process the data set swept by the VisibilityIterator using the
00591     // VisibilityProcessor provided with the optionally specified port
00592     // as the input.
00593 
00594     void process (VisibilityProcessor & processor,
00595                   ROVisibilityIterator & vi,
00596                   const String & inputPortName);
00597 
00598     void process (VisibilityProcessor & processor,
00599                   ROVisibilityIterator & vi,
00600                   const VpPort & inputPort = VpPort ());
00601 
00602     static Int getLogLevel ();
00603     static void log (const String & format, ...);
00604     static String getAipsRcBase ();
00605 
00606 private:
00607 
00608     ROVisibilityIterator * vi_p; // [use]
00609 
00610     static Int logLevel_p;
00611     static LogIO * logIo_p;
00612     static Bool loggingInitialized_p;
00613     static LogSink * logSink_p;
00614 
00615     static Bool initializeLogging ();
00616 
00617     ROVisibilityIterator * getVi ();
00618 
00619 };
00620 
00621 } // end namespace vpu
00622 
00623 } // end namespace casa
00624 
00625 
00626 /*
00627 
00628     VisibilityProcessor vp1;
00629     VisibilityProcessor vp2;
00630     VpuContainer vpc1;
00631 
00632     vpc1.add (vp1);
00633     vpc1.add (vp2);
00634 
00635     vpc1.connect (vp1.getOutput (Out), vp2.getInput (In));
00636     vpc1.connect (vpc1.getInput (In), vp1.getInput (In));
00637     vpc1.connect (vp2.getOutput (Out), vpc1.getOutput (Out));
00638 
00639     VpuContainer vpc2;
00640     VpuContainer vpc0;
00641 
00642     vpc0.add (vpc1, vpc2);
00643     vpc0.connect (vpc1.getOutput (Out), vpc2.getOutput (In));
00644     vpc0.connect (vpc0.getOutput (In), vpc1.getInput (In));
00645     vpc0.connect (vpc1.getOutput (Out), vpc0.getOutput (Out));
00646 
00647     vpc0.validate ();
00648 
00649  */
00650 
00651 
00652 
00653 #endif /* VISIBILITYPROCESSING_H_ */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 31 Aug 2016 for casa by  doxygen 1.6.1