DataAccumulator.h

Go to the documentation of this file.
00001 /*
00002  * DataAccumulator.h
00003  *
00004  *  Created on: Jan 18, 2016
00005  *      Author: nakazato
00006  */
00007 
00008 #ifndef SINGLEDISH_FILLER_DATAACCUMULATOR_H_
00009 #define SINGLEDISH_FILLER_DATAACCUMULATOR_H_
00010 
00011 #include <singledish/Filler/DataRecord.h>
00012 #include <singledish/Filler/FillerUtil.h>
00013 
00014 #include <vector>
00015 #include <cassert>
00016 #include <memory>
00017 #include <algorithm>
00018 
00019 #include <casacore/casa/BasicSL/String.h>
00020 #include <casacore/casa/Arrays/Vector.h>
00021 #include <casacore/casa/Arrays/Matrix.h>
00022 #include <casacore/casa/Arrays/ArrayMath.h>
00023 #include <casacore/casa/Arrays/ArrayIO.h>
00024 #include <casacore/casa/Containers/Record.h>
00025 
00026 #include <casacore/measures/Measures/Stokes.h>
00027 
00028 #include <casacore/tables/Tables/TableRecord.h>
00029 
00030 using namespace casacore;
00031 
00032 namespace {
00033 template<class T>
00034 inline void resizeTo(T &array, IPosition const &shape) {
00035   if (array.shape() != shape) {
00036     array.resize(shape, False);
00037   }
00038 }
00039 
00040 template<class T>
00041 inline void setValue1(ssize_t n, T const *src, T *dst) {
00042   for (ssize_t i = 0; i < n; ++i) {
00043     dst[i] = src[i];
00044   }
00045 }
00046 
00047 template<class T>
00048 inline void setValueToMatrixColumn(Vector<T> const &src, ssize_t icolumn,
00049     Matrix<T> &dst) {
00050   IPosition const &shape = dst.shape();
00051   ssize_t const nrow = shape[0];
00052   ssize_t const ncolumn = shape[1];
00053   if (icolumn >= ncolumn) {
00054     throw AipsError("Specified column doesn't exist.");
00055   }
00056 
00057   Bool b1, b2;
00058   T *dst_p = dst.getStorage(b1);
00059   T *work_p = dst_p + icolumn * nrow;
00060   T const *src_p = src.getStorage(b2);
00061 
00062   setValue1(nrow, src_p, work_p);
00063 
00064   src.freeStorage(src_p, b2);
00065   dst.putStorage(dst_p, b1);
00066 }
00067 
00068 template<class T, class Executor>
00069 inline void transposeMatrix(ssize_t n, ssize_t offset_src, Matrix<T> const &src,
00070     Matrix<T> &dst) {
00071   Bool b1, b2;
00072   T const *src_p = src.getStorage(b1);
00073   T *dst_p = dst.getStorage(b2);
00074   T const *wsrc_p = src_p + offset_src * n;
00075   T *wdst_p = dst_p;
00076 
00077   Executor::execute(n, wsrc_p, wdst_p);
00078 
00079   src.freeStorage(src_p, b1);
00080   dst.putStorage(dst_p, b2);
00081 }
00082 
00083 struct ExecuteMatrix1 {
00084   template<class T>
00085   static void execute(ssize_t n, T const *src, T *dst) {
00086     setValue1(n, src, dst);
00087   }
00088 };
00089 
00090 struct ExecuteMatrix2 {
00091   template<class T>
00092   static void execute(ssize_t n, T const *src, T *dst) {
00093     T const *row0_p = src;
00094     T const *row1_p = src + n;
00095     for (ssize_t i = 0; i < n; ++i) {
00096       dst[2 * i] = row0_p[i];
00097       dst[2 * i + 1] = row1_p[i];
00098     }
00099   }
00100 };
00101 
00102 struct ExecuteMatrix4X {
00103   template<class T>
00104   static void execute(ssize_t /*n*/, T const */*src*/, T */*dst*/) {
00105     throw std::runtime_error("");
00106   }
00107 };
00108 
00109 template<>
00110 inline void ExecuteMatrix4X::execute(ssize_t n, Bool const *src, Bool *dst) {
00111   Bool const *row0_p = src + 0 * n;
00112   Bool const *row1_p = src + 1 * n;
00113   Bool const *row2_p = src + 2 * n;
00114   Bool const *row3_p = src + 3 * n;
00115   for (ssize_t i = 0; i < n; ++i) {
00116     dst[4 * i + 0] = row0_p[i];
00117     Bool b = row2_p[i] || row3_p[i];
00118     dst[4 * i + 1] = b;
00119     dst[4 * i + 2] = b;
00120     dst[4 * i + 3] = row1_p[i];
00121   }
00122 }
00123 
00124 struct ExecuteMatrix4 {
00125   template<class T>
00126   static void execute(ssize_t n, T const *src, T *dst) {
00127     T const *row0_p = src + 0 * n;
00128     T const *row1_p = src + 1 * n;
00129     T const *row2_p = src + 2 * n;
00130     T const *row3_p = src + 3 * n;
00131     for (ssize_t i = 0; i < n; ++i) {
00132       dst[4 * i + 0] = row0_p[i];
00133       dst[4 * i + 1] = row1_p[i];
00134       dst[4 * i + 2] = row2_p[i];
00135       dst[4 * i + 3] = row3_p[i];
00136     }
00137   }
00138 };
00139 
00140 inline void transposeMatrix4F2C(ssize_t n, Matrix<Float> const &src,
00141     Matrix<Complex> &dst) {
00142   Bool b1, b2;
00143   Float const *src_p = src.getStorage(b1);
00144   Complex *dst_p = dst.getStorage(b2);
00145 
00146   Float const *row0_p = src_p + 0 * n;
00147   Float const *row1_p = src_p + 1 * n;
00148   Float const *row2_p = src_p + 2 * n;
00149   Float const *row3_p = src_p + 3 * n;
00150   for (ssize_t i = 0; i < n; ++i) {
00151     dst_p[4 * i].real(row0_p[i]);
00152     dst_p[4 * i].imag(0.0f);
00153     Float fr = row2_p[i];
00154     Float fi = row3_p[i];
00155     dst_p[4 * i + 1].real(fr);
00156     dst_p[4 * i + 1].imag(fi);
00157     dst_p[4 * i + 2].real(fr);
00158     dst_p[4 * i + 2].imag(-fi);
00159     dst_p[4 * i + 3].real(row1_p[i]);
00160     dst_p[4 * i + 3].imag(0.0f);
00161   }
00162 
00163   src.freeStorage(src_p, b1);
00164   dst.putStorage(dst_p, b2);
00165 }
00166 }
00167 
00168 namespace casa { //# NAMESPACE CASA - BEGIN
00169 namespace sdfiller { //# NAMESPACE SDFILLER - BEGIN
00170 
00171 class DataAccumulator;
00172 
00173 class DataChunk {
00174 public:
00175   friend DataAccumulator;
00176 
00177   DataChunk(String const &poltype) :
00178       num_pol_max_(4), num_chan_(0), data_(), flag_(),
00179       flag_row_(num_pol_max_, False), tsys_(), tcal_(),
00180       weight_(num_pol_max_, 1.0f), sigma_(weight_), poltype_(poltype),
00181       corr_type_(), filled_(NoData()), get_chunk_(nullptr),
00182       get_num_pol_(nullptr) {
00183     POST_START;
00184 
00185     setPolType(poltype);
00186 
00187     POST_END;
00188   }
00189 
00190   virtual ~DataChunk() {
00191   }
00192 
00193   String getPolType() const {
00194     return poltype_;
00195   }
00196 
00197   void resetPolType(String const &poltype) {
00198     initialize(num_chan_);
00199     setPolType(poltype);
00200   }
00201 
00202   uInt getNumPol() const {
00203     return (*this.*get_num_pol_)();
00204   }
00205 
00206   void initialize(size_t num_chan) {
00207     num_chan_ = num_chan;
00208     IPosition const shape(2, num_chan_, num_pol_max_);
00209     ::resizeTo(data_, shape);
00210     ::resizeTo(flag_, shape);
00211     ::resizeTo(tsys_, shape);
00212     ::resizeTo(tcal_, shape);
00213     tsys_ = -1.0f;
00214     tcal_ = -1.0f;
00215     filled_ = NoData();
00216   }
00217 
00218   void clear() {
00219     num_chan_ = 0;
00220     filled_ = NoData();
00221   }
00222 
00223   bool readyToWrite() {
00224     return true;
00225   }
00226 
00227   bool accumulate(DataRecord const &record) {
00228     POST_START;
00229 
00230     if (!isValidRecord(record)) {
00231       return false;
00232     }
00233 
00234     uInt polid = record.polno;
00235 
00236     if (num_pol_max_ <= polid) {
00237       return false;
00238     }
00239     Vector<Float> const &data = record.data;
00240     if (num_chan_ == 0) {
00241       size_t num_chan = data.size();
00242       initialize(num_chan);
00243     }
00244     Vector<Bool> const &flag = record.flag;
00245     Bool flagrow = record.flag_row;
00246 
00247     if (data.shape() != flag.shape()) {
00248       return false;
00249     }
00250 
00251     Vector<Float> tsys;
00252     if (!record.tsys.empty()) {
00253 //      std::cout << "tsys is not empty: " << record.tsys << std::endl;
00254       tsys.assign(record.tsys);
00255     }
00256     Vector<Float> tcal;
00257     if (!record.tcal.empty()) {
00258 //      std::cout << "tcal is not empty: " << record.tcal << std::endl;
00259       tcal.assign(record.tcal);
00260     }
00261 
00262     if (data.nelements() != num_chan_) {
00263       return false;
00264     }
00265 
00266     //data_.column(polid) = data;
00267     ::setValueToMatrixColumn(data, polid, data_);
00268     //flag_.column(polid) = flag;
00269     ::setValueToMatrixColumn(flag, polid, flag_);
00270     flag_row_[polid] = flagrow;
00271     if (tsys.size() == num_chan_) {
00272       //tsys_.column(polid) = tsys;
00273       ::setValueToMatrixColumn(tsys, polid, tsys_);
00274     } else if (!tsys.empty()) {
00275       tsys_(0, polid) = tsys[0];
00276     }
00277     if (tcal.size() == num_chan_) {
00278       //tcal_.column(polid) = tcal;
00279       ::setValueToMatrixColumn(tcal, polid, tcal_);
00280     } else if (!tcal.empty()) {
00281       tcal_(0, polid) = tcal[0];
00282     }
00283     filled_ |= 0x01 << polid;
00284 
00285     return true;
00286   }
00287 
00288   bool get(MSDataRecord &record) {
00289     bool return_value = (*this.*get_chunk_)(record);
00290     return return_value;
00291   }
00292 
00293 private:
00294   static constexpr unsigned char NoData() {
00295     return 0x00;
00296   }
00297   static constexpr unsigned char SinglePol0() {
00298     return 0x01;
00299   }
00300   static constexpr unsigned char SinglePol1() {
00301     return 0x01 << 1;
00302   }
00303   static constexpr unsigned char SinglePol2() {
00304     return 0x01 << 2;
00305   }
00306   static constexpr unsigned char SinglePol3() {
00307     return 0x01 << 3;
00308   }
00309   static constexpr unsigned char DualPol() {
00310     return SinglePol0() | SinglePol1();
00311   }
00312   static constexpr unsigned char FullPol() {
00313     return SinglePol0() | SinglePol1() | SinglePol2() | SinglePol3();
00314   }
00315   bool isFullPol() const {
00316     return judgePol(FullPol());
00317   }
00318   bool isDualPol() const {
00319     return judgePol(DualPol());
00320   }
00321   bool isSinglePol0() const {
00322     return judgePol(SinglePol0());
00323   }
00324   bool isSinglePol1() const {
00325     return judgePol(SinglePol1());
00326   }
00327   bool judgePol(unsigned char const pol) const {
00328     return (filled_ & pol) == pol;
00329   }
00330   bool isValidRecord(DataRecord const &record) {
00331     return !record.data.empty() && !record.flag.empty();
00332   }
00333   void setPolType(String const &poltype) {
00334     POST_START;
00335 
00336     poltype_ = poltype;
00337     if (poltype_ == "linear") {
00338       get_chunk_ = &DataChunk::getLinear;
00339       get_num_pol_ = &DataChunk::getNumPolLinear;
00340       corr_type_.resize(4);
00341       corr_type_[0] = Stokes::XX;
00342       corr_type_[1] = Stokes::XY;
00343       corr_type_[2] = Stokes::YX;
00344       corr_type_[3] = Stokes::YY;
00345     } else if (poltype_ == "circular") {
00346       get_chunk_ = &DataChunk::getCircular;
00347       get_num_pol_ = &DataChunk::getNumPolCircular;
00348       corr_type_.resize(4);
00349       corr_type_[0] = Stokes::RR;
00350       corr_type_[1] = Stokes::RL;
00351       corr_type_[2] = Stokes::LR;
00352       corr_type_[3] = Stokes::LL;
00353     } else if (poltype_ == "stokes") {
00354       get_chunk_ = &DataChunk::getStokes;
00355       get_num_pol_ = &DataChunk::getNumPolStokes;
00356       corr_type_.resize(4);
00357       corr_type_[0] = Stokes::I;
00358       corr_type_[1] = Stokes::Q;
00359       corr_type_[2] = Stokes::U;
00360       corr_type_[3] = Stokes::V;
00361     } else if (poltype_ == "linpol") {
00362       get_chunk_ = &DataChunk::getLinpol;
00363       get_num_pol_ = &DataChunk::getNumPolLinpol;
00364       corr_type_.resize(2);
00365       corr_type_[0] = Stokes::Plinear;
00366       corr_type_[1] = Stokes::Pangle;
00367     } else {
00368       throw AipsError(String("Invalid poltype") + poltype);
00369     }
00370 
00371     POST_END;
00372   }
00373   size_t const num_pol_max_;
00374   size_t num_chan_;
00375   Matrix<Float> data_;
00376   Matrix<Bool> flag_;
00377   Vector<Bool> flag_row_;
00378   Matrix<Float> tsys_;
00379   Matrix<Float> tcal_;
00380   Vector<Float> weight_;
00381   Vector<Float> sigma_;
00382   String poltype_;
00383   Vector<Int> corr_type_;
00384   unsigned char filled_;
00385   bool (DataChunk::*get_chunk_)(MSDataRecord &record);
00386   uInt (DataChunk::*get_num_pol_)() const;
00387 
00388   void setTsys2(MSDataRecord &record) {
00389     if (num_chan_ == 1) {
00390       record.setTsysSize(2, 1);
00391       record.tsys(0, 0) = tsys_(0, 0);
00392       record.tsys(1, 0) = tsys_(0, 1);
00393     } else {
00394       Float tsys00 = tsys_(0, 0);
00395       Float tsys01 = tsys_(0, 1);
00396       Float tsys10 = tsys_(1, 0);
00397       Float tsys11 = tsys_(1, 1);
00398       if ((tsys00 > 0.0f && tsys10 > 0.0f)
00399           || (tsys01 > 0.0f && tsys11 > 0.0f)) {
00400         record.setTsysSize(2, num_chan_);
00401         transposeMatrix<Float, ExecuteMatrix2>(num_chan_, 0, tsys_,
00402             record.tsys);
00403       } else if (tsys00 > 0.0f || tsys01 > 0.0f) {
00404         record.setTsysSize(2, 1);
00405         record.tsys(0, 0) = tsys_(0, 0);
00406         record.tsys(1, 0) = tsys_(0, 1);
00407       }
00408     }
00409   }
00410 
00411   void setTcal2(MSDataRecord &record) {
00412     if (num_chan_ == 1) {
00413       record.setTcalSize(2, 1);
00414       record.tcal(0, 0) = tcal_(0, 0);
00415       record.tcal(1, 0) = tcal_(0, 1);
00416     } else {
00417       Float tcal00 = tcal_(0, 0);
00418       Float tcal01 = tcal_(0, 1);
00419       Float tcal10 = tcal_(1, 0);
00420       Float tcal11 = tcal_(1, 1);
00421       if ((tcal00 > 0.0f && tcal10 > 0.0f)
00422           || (tcal01 > 0.0f && tcal11 > 0.0f)) {
00423         record.setTcalSize(2, num_chan_);
00424         transposeMatrix<Float, ExecuteMatrix2>(num_chan_, 0, tcal_,
00425             record.tcal);
00426       } else if (tcal00 > 0.0f || tcal01 > 0.0f) {
00427         record.setTcalSize(2, 1);
00428         record.tcal(0, 0) = tcal_(0, 0);
00429         record.tcal(1, 0) = tcal_(0, 1);
00430       }
00431     }
00432   }
00433 
00434   void setTsys1(ssize_t start_src, MSDataRecord &record) {
00435     if (num_chan_ == 1) {
00436       record.setTsysSize(1, 1);
00437       record.tsys(0, 0) = tsys_(0, start_src);
00438     } else if (tsys_(0, start_src) > 0.0f && tsys_(1, start_src) > 0.0f) {
00439       // should be spectral Tsys
00440       record.setTsysSize(1, num_chan_);
00441       //record.tsys = -1;
00442       transposeMatrix<Float, ExecuteMatrix1>(num_chan_, start_src, tsys_,
00443           record.tsys);
00444       //record.tsys.row(0) = tsys_.column(0);
00445     } else if (tsys_(0, start_src) > 0.0f) {
00446       // scalar Tsys
00447       record.setTsysSize(1, 1);
00448       record.tsys(0, 0) = tsys_(0, start_src);
00449     }
00450   }
00451 
00452   void setTcal1(ssize_t start_src, MSDataRecord &record) {
00453     if (num_chan_ == 1) {
00454       record.setTcalSize(1, 1);
00455       record.tcal(0, 0) = tcal_(0, start_src);
00456     } else if (tcal_(0, start_src) > 0.0f && tcal_(1, start_src) > 0.0f) {
00457       // should be spectral Tsys
00458       record.setTcalSize(1, num_chan_);
00459       //record.tsys = -1;
00460       transposeMatrix<Float, ExecuteMatrix1>(num_chan_, start_src, tcal_,
00461           record.tcal);
00462       //record.tsys.row(0) = tsys_.column(0);
00463     } else if (tcal_(0, start_src) > 0.0f) {
00464       // scalar Tsys
00465       record.setTcalSize(1, 1);
00466       record.tcal(0, 0) = tcal_(0, start_src);
00467     }
00468   }
00469 
00470   bool getLinear(MSDataRecord &record) {
00471     POST_START;
00472 
00473     Vector<Float> weight;
00474     Vector<Float> sigma;
00475     if (isFullPol()) {
00476       // POL 0, 1, 2, and 3
00477 //      std::cout << "set data/flag" << std::endl;
00478       record.setComplex();
00479       record.setDataSize(4, num_chan_);
00480       transposeMatrix4F2C(num_chan_, data_, record.complex_data);
00481       transposeMatrix<Bool, ExecuteMatrix4X>(num_chan_, 0, flag_, record.flag);
00482       record.flag_row = anyEQ(flag_row_, True);
00483 //      std::cout << "weight = " << record.weight << std::endl;
00484 
00485 //      std::cout << "set tsys" << std::endl;
00486       setTsys2(record);
00487 
00488 //      std::cout << "set tcal " << tcal_ << std::endl;
00489       setTcal2(record);
00490 
00491       record.num_pol = 4;
00492       record.corr_type = corr_type_;
00493     } else if (isDualPol()) {
00494       // POL 0 and 1
00495 //      std::cout << "set data/flag" << std::endl;
00496       record.setFloat();
00497       record.setDataSize(2, num_chan_);
00498       transposeMatrix<Float, ExecuteMatrix2>(num_chan_, 0, data_,
00499           record.float_data);
00500       transposeMatrix<Bool, ExecuteMatrix2>(num_chan_, 0, flag_, record.flag);
00501       record.flag_row = flag_row_[0] || flag_row_[1];
00502 //      std::cout << "weight = " << record.weight << std::endl;
00503 
00504 //      std::cout << "set tsys" << std::endl;
00505       setTsys2(record);
00506 
00507 //      std::cout << "set tcal " << tcal_ << std::endl;
00508       setTcal2(record);
00509 
00510       record.num_pol = 2;
00511       record.corr_type[0] = corr_type_[0];
00512       record.corr_type[1] = corr_type_[3];
00513     } else if (isSinglePol0()) {
00514       // only POL 0
00515 //      std::cout << "set data/flag (pol 0)" << std::endl;
00516       record.setFloat();
00517       record.setDataSize(1, num_chan_);
00518       transposeMatrix<Float, ExecuteMatrix1>(num_chan_, 0, data_,
00519           record.float_data);
00520       transposeMatrix<Bool, ExecuteMatrix1>(num_chan_, 0, flag_, record.flag);
00521       record.flag_row = flag_row_(0);
00522 
00523       setTsys1(0, record);
00524 
00525 //      std::cout << "set tcal " << tcal_ << std::endl;
00526       setTcal1(0, record);
00527 
00528       record.num_pol = 1;
00529       record.corr_type[0] = corr_type_[0];
00530     } else if (isSinglePol1()) {
00531       // only POL 1
00532 //      std::cout << "set data/flag (pol 1)" << std::endl;
00533       record.setFloat();
00534       record.setDataSize(1, num_chan_);
00535       transposeMatrix<Float, ExecuteMatrix1>(num_chan_, 1, data_,
00536           record.float_data);
00537       transposeMatrix<Bool, ExecuteMatrix1>(num_chan_, 1, flag_, record.flag);
00538       record.flag_row = flag_row_(1);
00539 
00540       setTsys1(1, record);
00541 
00542 //      std::cout << "set tcal " << tcal_ << std::endl;
00543       setTcal1(1, record);
00544 
00545       record.num_pol = 1;
00546       record.corr_type[0] = corr_type_[3];
00547     } else {
00548 //      std::cout << "DataChunk is not ready for get" << std::endl;
00549       return false;
00550     }
00551 
00552     POST_END;
00553     return true;
00554   }
00555 
00556   bool getCircular(MSDataRecord &record) {
00557     return getLinear(record);
00558   }
00559 
00560   bool getStokes(MSDataRecord &record) {
00561     POST_START;
00562 
00563     record.setFloat();
00564     if (isFullPol()) {
00565       record.setDataSize(4, num_chan_);
00566       transposeMatrix<Float, ExecuteMatrix4>(num_chan_, 0, data_,
00567           record.float_data);
00568       transposeMatrix<Bool, ExecuteMatrix4>(num_chan_, 0, flag_, record.flag);
00569       record.flag_row = anyTrue(flag_row_);
00570 
00571       record.num_pol = 4;
00572       record.corr_type = corr_type_;
00573     } else if (isSinglePol0()) {
00574       record.setDataSize(1, num_chan_);
00575       transposeMatrix<Float, ExecuteMatrix1>(num_chan_, 0, data_,
00576           record.float_data);
00577       transposeMatrix<Bool, ExecuteMatrix1>(num_chan_, 0, flag_, record.flag);
00578       record.flag_row = flag_row_[0];
00579 
00580       record.num_pol = 1;
00581       record.corr_type[0] = corr_type_[0];
00582     } else {
00583       return false;
00584     }
00585 
00586     POST_END;
00587     return true;
00588   }
00589 
00590   bool getLinpol(MSDataRecord &record) {
00591     POST_START;
00592 
00593     record.setFloat();
00594     if (isDualPol()) {
00595       // POL 0 and 1
00596       record.setDataSize(2, num_chan_);
00597       transposeMatrix<Float, ExecuteMatrix2>(num_chan_, 0, data_,
00598           record.float_data);
00599       transposeMatrix<Bool, ExecuteMatrix2>(num_chan_, 0, flag_, record.flag);
00600       record.flag_row = flag_row_[0] || flag_row_[1];
00601 
00602       record.num_pol = 2;
00603       record.corr_type = corr_type_;
00604     } else if (isSinglePol0()) {
00605       record.setDataSize(1, num_chan_);
00606       transposeMatrix<Float, ExecuteMatrix1>(num_chan_, 0, data_,
00607           record.float_data);
00608       transposeMatrix<Bool, ExecuteMatrix1>(num_chan_, 0, flag_, record.flag);
00609       record.flag_row = flag_row_[0];
00610 
00611       record.num_pol = 1;
00612       record.corr_type[0] = corr_type_[0];
00613     } else {
00614       return false;
00615     }
00616 
00617     POST_END;
00618     return true;
00619   }
00620 
00621   uInt getNumPolLinear() const {
00622     if (isFullPol()) {
00623       return 4;
00624     } else if (isDualPol()) {
00625       return 2;
00626     } else if (isSinglePol0() || isSinglePol1()) {
00627       return 1;
00628     } else {
00629       return 0;
00630     }
00631   }
00632 
00633   uInt getNumPolCircular() const {
00634     return getNumPolLinear();
00635   }
00636 
00637   uInt getNumPolStokes() const {
00638     if (isFullPol()) {
00639       return 4;
00640     } else if (isSinglePol0()) {
00641       return 1;
00642     } else {
00643       return 0;
00644     }
00645   }
00646 
00647   uInt getNumPolLinpol() const {
00648     if (isDualPol()) {
00649       return 2;
00650     } else if (isSinglePol0()) {
00651       return 1;
00652     } else {
00653       return 0;
00654     }
00655   }
00656 };
00657 
00658 class DataAccumulator {
00659 private:
00660   struct DataAccumulatorKey {
00661     Int antenna_id;
00662     Int field_id;
00663     Int feed_id;
00664     Int spw_id;
00665     String pol_type;
00666     String intent;
00667 
00668     template<class T, class C>
00669     bool comp(T const &a, T const &b, C const &c) const {
00670       if (a < b) {
00671         return true;
00672       } else if (a == b) {
00673         return c();
00674       } else {
00675         return false;
00676       }
00677     }
00678 
00679     bool operator()(DataAccumulatorKey const &lhs,
00680         DataAccumulatorKey const &rhs) const {
00681       return comp(lhs.antenna_id, rhs.antenna_id,
00682           [&]() {return comp(lhs.field_id, rhs.field_id,
00683                 [&]() {return comp(lhs.feed_id, rhs.feed_id,
00684                       [&]() {return comp(lhs.spw_id, rhs.spw_id,
00685                             [&]() {return comp(lhs.pol_type, rhs.pol_type,
00686                                   [&]() {return comp(lhs.intent, rhs.intent,
00687                                         []() {return false;});});});});});});
00688     }
00689   };
00690 
00691 public:
00692   DataAccumulator() :
00693       pool_(), antenna_id_(), spw_id_(), field_id_(), feed_id_(), scan_(),
00694       subscan_(), intent_(), direction_(), interval_(), indexer_(), time_(-1.0),
00695       is_free_() {
00696   }
00697 
00698   virtual ~DataAccumulator() {
00699     POST_START;
00700 
00701     for (auto iter = pool_.begin(); iter != pool_.end(); ++iter) {
00702       delete *iter;
00703     }
00704 
00705     POST_END;
00706   }
00707 
00708   size_t getNumberOfChunks() const {
00709     return pool_.size();
00710   }
00711 
00712   size_t getNumberOfActiveChunks() const {
00713     return std::count_if(pool_.begin(), pool_.end(), [](DataChunk * const &c) {
00714       return c->getNumPol() > 0;
00715     });
00716   }
00717 
00718   bool queryForGet(DataRecord const &record) const {
00719     Double const time = record.time;
00720     bool is_ready = (0.0 <= time_) && !(time_ == time);
00721     return is_ready;
00722   }
00723 
00724   bool queryForGet(Double const &time) const {
00725     bool is_ready = (0.0 <= time_) && !(time_ == time);
00726     return is_ready;
00727   }
00728 
00729   void clear() {
00730     for (auto iter = pool_.begin(); iter != pool_.end(); ++iter) {
00731       (*iter)->clear();
00732     }
00733     time_ = -1.0;
00734   }
00735 
00736   bool get(size_t ichunk, MSDataRecord &record) {
00737     POST_START;
00738 
00739     if (pool_.size() == 0) {
00740       return false;
00741     } else if (ichunk >= pool_.size()) {
00742       return false;
00743     }
00744     bool status = pool_[ichunk]->get(record);
00745 //    std::cout << "get Chunk status = " << status << std::endl;
00746     if (!status) {
00747       record.clear();
00748       return status;
00749     }
00750     record.time = time_;
00751     record.pol_type = pool_[ichunk]->getPolType();
00752     record.antenna_id = antenna_id_[ichunk];
00753     record.spw_id = spw_id_[ichunk];
00754     record.field_id = field_id_[ichunk];
00755     record.feed_id = feed_id_[ichunk];
00756     record.scan = scan_[ichunk];
00757     record.subscan = subscan_[ichunk];
00758     record.intent = intent_[ichunk];
00759     record.direction = direction_[ichunk];
00760     record.interval = interval_[ichunk];
00761     record.temperature = temperature_[ichunk];
00762     record.pressure = pressure_[ichunk];
00763     record.rel_humidity = rel_humidity_[ichunk];
00764     record.wind_speed = wind_speed_[ichunk];
00765     record.wind_direction = wind_direction_[ichunk];
00766 
00767     POST_END;
00768     return status;
00769   }
00770 
00771   bool accumulate(DataRecord const &record) {
00772     POST_START;
00773 
00774     if (!isValidRecord(record)) {
00775 //      std::cout << "record is not a valid one" << std::endl;
00776       return false;
00777     }
00778 
00779     Double time = record.time;
00780     if (time_ < 0.0) {
00781       time_ = time;
00782     }
00783     if (time_ != time) {
00784 //      std::cout << "timestamp mismatch" << std::endl;
00785       return false;
00786     }
00787     Int antennaid = record.antenna_id;
00788     Int spwid = record.spw_id;
00789     Int fieldid = record.field_id;
00790     Int feedid = record.feed_id;
00791     Int scan = record.scan;
00792     Int subscan = record.subscan;
00793     String intent = record.intent;
00794     String poltype = record.pol_type;
00795     DataAccumulatorKey key;
00796     key.antenna_id = record.antenna_id;
00797     key.field_id = record.field_id;
00798     key.feed_id = record.feed_id;
00799     key.spw_id = record.spw_id;
00800     key.intent = record.intent;
00801     key.pol_type = record.pol_type;
00802     Matrix<Double> const &direction = record.direction;
00803     Double interval = record.interval;
00804     Float temperature = record.temperature;
00805     Float pressure = record.pressure;
00806     Float rel_humidity = record.rel_humidity;
00807     Float wind_speed = record.wind_speed;
00808     Float wind_direction = record.wind_direction;
00809     bool status = false;
00810     auto iter = indexer_.find(key);
00811     if (iter != indexer_.end()) {
00812       uInt index = iter->second;
00813       status = pool_[index]->accumulate(record);
00814       if (status) {
00815         antenna_id_[index] = antennaid;
00816         spw_id_[index] = spwid;
00817         field_id_[index] = fieldid;
00818         feed_id_[index] = feedid;
00819         scan_[index] = scan;
00820         subscan_[index] = subscan;
00821         intent_[index] = intent;
00822         direction_[index].assign(direction);
00823         interval_[index] = interval;
00824         temperature_[index] = temperature;
00825         pressure_[index] = pressure;
00826         rel_humidity_[index] = rel_humidity;
00827         wind_speed_[index] = wind_speed;
00828         wind_direction_[index] = wind_direction;
00829       }
00830     } else {
00831       pool_.push_back(new DataChunk(poltype));
00832       antenna_id_.push_back(-1);
00833       spw_id_.push_back(-1);
00834       field_id_.push_back(-1);
00835       feed_id_.push_back(-1);
00836       scan_.push_back(-1);
00837       subscan_.push_back(-1);
00838       intent_.push_back("");
00839       direction_.push_back(Matrix<Double>());
00840       interval_.push_back(-1.0);
00841       temperature_.push_back(0.0f);
00842       pressure_.push_back(0.0f);
00843       rel_humidity_.push_back(0.0f);
00844       wind_speed_.push_back(0.0f);
00845       wind_direction_.push_back(0.0f);
00846       uInt index = pool_.size() - 1;
00847       indexer_[key] = index;
00848       status = pool_[index]->accumulate(record);
00849       if (status) {
00850         antenna_id_[index] = antennaid;
00851         spw_id_[index] = spwid;
00852         field_id_[index] = fieldid;
00853         feed_id_[index] = feedid;
00854         scan_[index] = scan;
00855         subscan_[index] = subscan;
00856         intent_[index] = intent;
00857         direction_[index].assign(direction);
00858         interval_[index] = interval;
00859         temperature_[index] = temperature;
00860         pressure_[index] = pressure;
00861         rel_humidity_[index] = rel_humidity;
00862         wind_speed_[index] = wind_speed;
00863         wind_direction_[index] = wind_direction;
00864       }
00865     }
00866 
00867 //    std::cout << "status = " << status << std::endl;
00868 //    std::cout << "key (a" << key.antenna_id << ",f" << key.field_id << ",s"
00869 //        << key.spw_id << ",i" << key.intent << ",p" << key.pol_type << ",d"
00870 //        << key.feed_id << "(index " << indexer_[key] << "): TIME="
00871 //        << time_ << " INTERVAL=" << interval << " polno=" << record.polno << std::endl;
00872     POST_END;
00873     return status;
00874   }
00875 
00876   String getPolType(size_t ichunk) const {
00877     assert(ichunk < pool_.size());
00878     return pool_[ichunk]->getPolType();
00879   }
00880 
00881   uInt getNumPol(size_t ichunk) const {
00882     assert(ichunk < pool_.size());
00883     return pool_[ichunk]->getNumPol();
00884   }
00885 
00886 private:
00887   bool isValidRecord(DataRecord const &record) {
00888 //    std::cout << record.time << " " << record.interval << " "
00889 //        << record.antenna_id << " " << record.field_id << " " << record.feed_id
00890 //        << " " << record.spw_id << " " << record.scan << " " << record.subscan
00891 //        << " " << record.direction << std::endl;
00892     return record.time > 0.0 && record.interval > 0.0 && record.antenna_id >= 0
00893         && record.field_id >= 0 && record.feed_id >= 0 && record.spw_id >= 0
00894         && record.scan >= 0 && record.subscan >= 0 && !record.direction.empty();
00895   }
00896 
00897   std::vector<DataChunk *> pool_;
00898   std::vector<Int> antenna_id_;
00899   std::vector<Int> spw_id_;
00900   std::vector<Int> field_id_;
00901   std::vector<Int> feed_id_;
00902   std::vector<Int> scan_;
00903   std::vector<Int> subscan_;
00904   std::vector<String> intent_;
00905   std::vector<Matrix<Double> > direction_;
00906   std::vector<Double> interval_;
00907   std::vector<Float> temperature_;
00908   std::vector<Float> pressure_;
00909   std::vector<Float> rel_humidity_;
00910   std::vector<Float> wind_speed_;
00911   std::vector<Float> wind_direction_;
00912   std::map<DataAccumulatorKey, uInt, DataAccumulatorKey> indexer_;
00913   Double time_;
00914   std::vector<bool> is_free_;
00915 };
00916 
00917 } //# NAMESPACE SDFILLER - END
00918 } //# NAMESPACE CASA - END
00919 
00920 #endif /* SINGLEDISH_FILLER_DATAACCUMULATOR_H_ */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 31 Aug 2016 for casa by  doxygen 1.6.1