00001
00002
00003
00004
00005
00006
00007
00008 #ifndef SINGLEDISH_FILLER_DATAACCUMULATOR_H_
00009 #define SINGLEDISH_FILLER_DATAACCUMULATOR_H_
00010
00011 #include <singledish/Filler/DataRecord.h>
00012 #include <singledish/Filler/FillerUtil.h>
00013
00014 #include <vector>
00015 #include <cassert>
00016 #include <memory>
00017 #include <algorithm>
00018
00019 #include <casacore/casa/BasicSL/String.h>
00020 #include <casacore/casa/Arrays/Vector.h>
00021 #include <casacore/casa/Arrays/Matrix.h>
00022 #include <casacore/casa/Arrays/ArrayMath.h>
00023 #include <casacore/casa/Arrays/ArrayIO.h>
00024 #include <casacore/casa/Containers/Record.h>
00025
00026 #include <casacore/measures/Measures/Stokes.h>
00027
00028 #include <casacore/tables/Tables/TableRecord.h>
00029
00030 using namespace casacore;
00031
00032 namespace {
00033 template<class T>
00034 inline void resizeTo(T &array, IPosition const &shape) {
00035 if (array.shape() != shape) {
00036 array.resize(shape, False);
00037 }
00038 }
00039
00040 template<class T>
00041 inline void setValue1(ssize_t n, T const *src, T *dst) {
00042 for (ssize_t i = 0; i < n; ++i) {
00043 dst[i] = src[i];
00044 }
00045 }
00046
00047 template<class T>
00048 inline void setValueToMatrixColumn(Vector<T> const &src, ssize_t icolumn,
00049 Matrix<T> &dst) {
00050 IPosition const &shape = dst.shape();
00051 ssize_t const nrow = shape[0];
00052 ssize_t const ncolumn = shape[1];
00053 if (icolumn >= ncolumn) {
00054 throw AipsError("Specified column doesn't exist.");
00055 }
00056
00057 Bool b1, b2;
00058 T *dst_p = dst.getStorage(b1);
00059 T *work_p = dst_p + icolumn * nrow;
00060 T const *src_p = src.getStorage(b2);
00061
00062 setValue1(nrow, src_p, work_p);
00063
00064 src.freeStorage(src_p, b2);
00065 dst.putStorage(dst_p, b1);
00066 }
00067
00068 template<class T, class Executor>
00069 inline void transposeMatrix(ssize_t n, ssize_t offset_src, Matrix<T> const &src,
00070 Matrix<T> &dst) {
00071 Bool b1, b2;
00072 T const *src_p = src.getStorage(b1);
00073 T *dst_p = dst.getStorage(b2);
00074 T const *wsrc_p = src_p + offset_src * n;
00075 T *wdst_p = dst_p;
00076
00077 Executor::execute(n, wsrc_p, wdst_p);
00078
00079 src.freeStorage(src_p, b1);
00080 dst.putStorage(dst_p, b2);
00081 }
00082
00083 struct ExecuteMatrix1 {
00084 template<class T>
00085 static void execute(ssize_t n, T const *src, T *dst) {
00086 setValue1(n, src, dst);
00087 }
00088 };
00089
00090 struct ExecuteMatrix2 {
00091 template<class T>
00092 static void execute(ssize_t n, T const *src, T *dst) {
00093 T const *row0_p = src;
00094 T const *row1_p = src + n;
00095 for (ssize_t i = 0; i < n; ++i) {
00096 dst[2 * i] = row0_p[i];
00097 dst[2 * i + 1] = row1_p[i];
00098 }
00099 }
00100 };
00101
00102 struct ExecuteMatrix4X {
00103 template<class T>
00104 static void execute(ssize_t , T const *, T *) {
00105 throw std::runtime_error("");
00106 }
00107 };
00108
00109 template<>
00110 inline void ExecuteMatrix4X::execute(ssize_t n, Bool const *src, Bool *dst) {
00111 Bool const *row0_p = src + 0 * n;
00112 Bool const *row1_p = src + 1 * n;
00113 Bool const *row2_p = src + 2 * n;
00114 Bool const *row3_p = src + 3 * n;
00115 for (ssize_t i = 0; i < n; ++i) {
00116 dst[4 * i + 0] = row0_p[i];
00117 Bool b = row2_p[i] || row3_p[i];
00118 dst[4 * i + 1] = b;
00119 dst[4 * i + 2] = b;
00120 dst[4 * i + 3] = row1_p[i];
00121 }
00122 }
00123
00124 struct ExecuteMatrix4 {
00125 template<class T>
00126 static void execute(ssize_t n, T const *src, T *dst) {
00127 T const *row0_p = src + 0 * n;
00128 T const *row1_p = src + 1 * n;
00129 T const *row2_p = src + 2 * n;
00130 T const *row3_p = src + 3 * n;
00131 for (ssize_t i = 0; i < n; ++i) {
00132 dst[4 * i + 0] = row0_p[i];
00133 dst[4 * i + 1] = row1_p[i];
00134 dst[4 * i + 2] = row2_p[i];
00135 dst[4 * i + 3] = row3_p[i];
00136 }
00137 }
00138 };
00139
00140 inline void transposeMatrix4F2C(ssize_t n, Matrix<Float> const &src,
00141 Matrix<Complex> &dst) {
00142 Bool b1, b2;
00143 Float const *src_p = src.getStorage(b1);
00144 Complex *dst_p = dst.getStorage(b2);
00145
00146 Float const *row0_p = src_p + 0 * n;
00147 Float const *row1_p = src_p + 1 * n;
00148 Float const *row2_p = src_p + 2 * n;
00149 Float const *row3_p = src_p + 3 * n;
00150 for (ssize_t i = 0; i < n; ++i) {
00151 dst_p[4 * i].real(row0_p[i]);
00152 dst_p[4 * i].imag(0.0f);
00153 Float fr = row2_p[i];
00154 Float fi = row3_p[i];
00155 dst_p[4 * i + 1].real(fr);
00156 dst_p[4 * i + 1].imag(fi);
00157 dst_p[4 * i + 2].real(fr);
00158 dst_p[4 * i + 2].imag(-fi);
00159 dst_p[4 * i + 3].real(row1_p[i]);
00160 dst_p[4 * i + 3].imag(0.0f);
00161 }
00162
00163 src.freeStorage(src_p, b1);
00164 dst.putStorage(dst_p, b2);
00165 }
00166 }
00167
00168 namespace casa {
00169 namespace sdfiller {
00170
00171 class DataAccumulator;
00172
00173 class DataChunk {
00174 public:
00175 friend DataAccumulator;
00176
00177 DataChunk(String const &poltype) :
00178 num_pol_max_(4), num_chan_(0), data_(), flag_(),
00179 flag_row_(num_pol_max_, False), tsys_(), tcal_(),
00180 weight_(num_pol_max_, 1.0f), sigma_(weight_), poltype_(poltype),
00181 corr_type_(), filled_(NoData()), get_chunk_(nullptr),
00182 get_num_pol_(nullptr) {
00183 POST_START;
00184
00185 setPolType(poltype);
00186
00187 POST_END;
00188 }
00189
00190 virtual ~DataChunk() {
00191 }
00192
00193 String getPolType() const {
00194 return poltype_;
00195 }
00196
00197 void resetPolType(String const &poltype) {
00198 initialize(num_chan_);
00199 setPolType(poltype);
00200 }
00201
00202 uInt getNumPol() const {
00203 return (*this.*get_num_pol_)();
00204 }
00205
00206 void initialize(size_t num_chan) {
00207 num_chan_ = num_chan;
00208 IPosition const shape(2, num_chan_, num_pol_max_);
00209 ::resizeTo(data_, shape);
00210 ::resizeTo(flag_, shape);
00211 ::resizeTo(tsys_, shape);
00212 ::resizeTo(tcal_, shape);
00213 tsys_ = -1.0f;
00214 tcal_ = -1.0f;
00215 filled_ = NoData();
00216 }
00217
00218 void clear() {
00219 num_chan_ = 0;
00220 filled_ = NoData();
00221 }
00222
00223 bool readyToWrite() {
00224 return true;
00225 }
00226
00227 bool accumulate(DataRecord const &record) {
00228 POST_START;
00229
00230 if (!isValidRecord(record)) {
00231 return false;
00232 }
00233
00234 uInt polid = record.polno;
00235
00236 if (num_pol_max_ <= polid) {
00237 return false;
00238 }
00239 Vector<Float> const &data = record.data;
00240 if (num_chan_ == 0) {
00241 size_t num_chan = data.size();
00242 initialize(num_chan);
00243 }
00244 Vector<Bool> const &flag = record.flag;
00245 Bool flagrow = record.flag_row;
00246
00247 if (data.shape() != flag.shape()) {
00248 return false;
00249 }
00250
00251 Vector<Float> tsys;
00252 if (!record.tsys.empty()) {
00253
00254 tsys.assign(record.tsys);
00255 }
00256 Vector<Float> tcal;
00257 if (!record.tcal.empty()) {
00258
00259 tcal.assign(record.tcal);
00260 }
00261
00262 if (data.nelements() != num_chan_) {
00263 return false;
00264 }
00265
00266
00267 ::setValueToMatrixColumn(data, polid, data_);
00268
00269 ::setValueToMatrixColumn(flag, polid, flag_);
00270 flag_row_[polid] = flagrow;
00271 if (tsys.size() == num_chan_) {
00272
00273 ::setValueToMatrixColumn(tsys, polid, tsys_);
00274 } else if (!tsys.empty()) {
00275 tsys_(0, polid) = tsys[0];
00276 }
00277 if (tcal.size() == num_chan_) {
00278
00279 ::setValueToMatrixColumn(tcal, polid, tcal_);
00280 } else if (!tcal.empty()) {
00281 tcal_(0, polid) = tcal[0];
00282 }
00283 filled_ |= 0x01 << polid;
00284
00285 return true;
00286 }
00287
00288 bool get(MSDataRecord &record) {
00289 bool return_value = (*this.*get_chunk_)(record);
00290 return return_value;
00291 }
00292
00293 private:
00294 static constexpr unsigned char NoData() {
00295 return 0x00;
00296 }
00297 static constexpr unsigned char SinglePol0() {
00298 return 0x01;
00299 }
00300 static constexpr unsigned char SinglePol1() {
00301 return 0x01 << 1;
00302 }
00303 static constexpr unsigned char SinglePol2() {
00304 return 0x01 << 2;
00305 }
00306 static constexpr unsigned char SinglePol3() {
00307 return 0x01 << 3;
00308 }
00309 static constexpr unsigned char DualPol() {
00310 return SinglePol0() | SinglePol1();
00311 }
00312 static constexpr unsigned char FullPol() {
00313 return SinglePol0() | SinglePol1() | SinglePol2() | SinglePol3();
00314 }
00315 bool isFullPol() const {
00316 return judgePol(FullPol());
00317 }
00318 bool isDualPol() const {
00319 return judgePol(DualPol());
00320 }
00321 bool isSinglePol0() const {
00322 return judgePol(SinglePol0());
00323 }
00324 bool isSinglePol1() const {
00325 return judgePol(SinglePol1());
00326 }
00327 bool judgePol(unsigned char const pol) const {
00328 return (filled_ & pol) == pol;
00329 }
00330 bool isValidRecord(DataRecord const &record) {
00331 return !record.data.empty() && !record.flag.empty();
00332 }
00333 void setPolType(String const &poltype) {
00334 POST_START;
00335
00336 poltype_ = poltype;
00337 if (poltype_ == "linear") {
00338 get_chunk_ = &DataChunk::getLinear;
00339 get_num_pol_ = &DataChunk::getNumPolLinear;
00340 corr_type_.resize(4);
00341 corr_type_[0] = Stokes::XX;
00342 corr_type_[1] = Stokes::XY;
00343 corr_type_[2] = Stokes::YX;
00344 corr_type_[3] = Stokes::YY;
00345 } else if (poltype_ == "circular") {
00346 get_chunk_ = &DataChunk::getCircular;
00347 get_num_pol_ = &DataChunk::getNumPolCircular;
00348 corr_type_.resize(4);
00349 corr_type_[0] = Stokes::RR;
00350 corr_type_[1] = Stokes::RL;
00351 corr_type_[2] = Stokes::LR;
00352 corr_type_[3] = Stokes::LL;
00353 } else if (poltype_ == "stokes") {
00354 get_chunk_ = &DataChunk::getStokes;
00355 get_num_pol_ = &DataChunk::getNumPolStokes;
00356 corr_type_.resize(4);
00357 corr_type_[0] = Stokes::I;
00358 corr_type_[1] = Stokes::Q;
00359 corr_type_[2] = Stokes::U;
00360 corr_type_[3] = Stokes::V;
00361 } else if (poltype_ == "linpol") {
00362 get_chunk_ = &DataChunk::getLinpol;
00363 get_num_pol_ = &DataChunk::getNumPolLinpol;
00364 corr_type_.resize(2);
00365 corr_type_[0] = Stokes::Plinear;
00366 corr_type_[1] = Stokes::Pangle;
00367 } else {
00368 throw AipsError(String("Invalid poltype") + poltype);
00369 }
00370
00371 POST_END;
00372 }
00373 size_t const num_pol_max_;
00374 size_t num_chan_;
00375 Matrix<Float> data_;
00376 Matrix<Bool> flag_;
00377 Vector<Bool> flag_row_;
00378 Matrix<Float> tsys_;
00379 Matrix<Float> tcal_;
00380 Vector<Float> weight_;
00381 Vector<Float> sigma_;
00382 String poltype_;
00383 Vector<Int> corr_type_;
00384 unsigned char filled_;
00385 bool (DataChunk::*get_chunk_)(MSDataRecord &record);
00386 uInt (DataChunk::*get_num_pol_)() const;
00387
00388 void setTsys2(MSDataRecord &record) {
00389 if (num_chan_ == 1) {
00390 record.setTsysSize(2, 1);
00391 record.tsys(0, 0) = tsys_(0, 0);
00392 record.tsys(1, 0) = tsys_(0, 1);
00393 } else {
00394 Float tsys00 = tsys_(0, 0);
00395 Float tsys01 = tsys_(0, 1);
00396 Float tsys10 = tsys_(1, 0);
00397 Float tsys11 = tsys_(1, 1);
00398 if ((tsys00 > 0.0f && tsys10 > 0.0f)
00399 || (tsys01 > 0.0f && tsys11 > 0.0f)) {
00400 record.setTsysSize(2, num_chan_);
00401 transposeMatrix<Float, ExecuteMatrix2>(num_chan_, 0, tsys_,
00402 record.tsys);
00403 } else if (tsys00 > 0.0f || tsys01 > 0.0f) {
00404 record.setTsysSize(2, 1);
00405 record.tsys(0, 0) = tsys_(0, 0);
00406 record.tsys(1, 0) = tsys_(0, 1);
00407 }
00408 }
00409 }
00410
00411 void setTcal2(MSDataRecord &record) {
00412 if (num_chan_ == 1) {
00413 record.setTcalSize(2, 1);
00414 record.tcal(0, 0) = tcal_(0, 0);
00415 record.tcal(1, 0) = tcal_(0, 1);
00416 } else {
00417 Float tcal00 = tcal_(0, 0);
00418 Float tcal01 = tcal_(0, 1);
00419 Float tcal10 = tcal_(1, 0);
00420 Float tcal11 = tcal_(1, 1);
00421 if ((tcal00 > 0.0f && tcal10 > 0.0f)
00422 || (tcal01 > 0.0f && tcal11 > 0.0f)) {
00423 record.setTcalSize(2, num_chan_);
00424 transposeMatrix<Float, ExecuteMatrix2>(num_chan_, 0, tcal_,
00425 record.tcal);
00426 } else if (tcal00 > 0.0f || tcal01 > 0.0f) {
00427 record.setTcalSize(2, 1);
00428 record.tcal(0, 0) = tcal_(0, 0);
00429 record.tcal(1, 0) = tcal_(0, 1);
00430 }
00431 }
00432 }
00433
00434 void setTsys1(ssize_t start_src, MSDataRecord &record) {
00435 if (num_chan_ == 1) {
00436 record.setTsysSize(1, 1);
00437 record.tsys(0, 0) = tsys_(0, start_src);
00438 } else if (tsys_(0, start_src) > 0.0f && tsys_(1, start_src) > 0.0f) {
00439
00440 record.setTsysSize(1, num_chan_);
00441
00442 transposeMatrix<Float, ExecuteMatrix1>(num_chan_, start_src, tsys_,
00443 record.tsys);
00444
00445 } else if (tsys_(0, start_src) > 0.0f) {
00446
00447 record.setTsysSize(1, 1);
00448 record.tsys(0, 0) = tsys_(0, start_src);
00449 }
00450 }
00451
00452 void setTcal1(ssize_t start_src, MSDataRecord &record) {
00453 if (num_chan_ == 1) {
00454 record.setTcalSize(1, 1);
00455 record.tcal(0, 0) = tcal_(0, start_src);
00456 } else if (tcal_(0, start_src) > 0.0f && tcal_(1, start_src) > 0.0f) {
00457
00458 record.setTcalSize(1, num_chan_);
00459
00460 transposeMatrix<Float, ExecuteMatrix1>(num_chan_, start_src, tcal_,
00461 record.tcal);
00462
00463 } else if (tcal_(0, start_src) > 0.0f) {
00464
00465 record.setTcalSize(1, 1);
00466 record.tcal(0, 0) = tcal_(0, start_src);
00467 }
00468 }
00469
00470 bool getLinear(MSDataRecord &record) {
00471 POST_START;
00472
00473 Vector<Float> weight;
00474 Vector<Float> sigma;
00475 if (isFullPol()) {
00476
00477
00478 record.setComplex();
00479 record.setDataSize(4, num_chan_);
00480 transposeMatrix4F2C(num_chan_, data_, record.complex_data);
00481 transposeMatrix<Bool, ExecuteMatrix4X>(num_chan_, 0, flag_, record.flag);
00482 record.flag_row = anyEQ(flag_row_, True);
00483
00484
00485
00486 setTsys2(record);
00487
00488
00489 setTcal2(record);
00490
00491 record.num_pol = 4;
00492 record.corr_type = corr_type_;
00493 } else if (isDualPol()) {
00494
00495
00496 record.setFloat();
00497 record.setDataSize(2, num_chan_);
00498 transposeMatrix<Float, ExecuteMatrix2>(num_chan_, 0, data_,
00499 record.float_data);
00500 transposeMatrix<Bool, ExecuteMatrix2>(num_chan_, 0, flag_, record.flag);
00501 record.flag_row = flag_row_[0] || flag_row_[1];
00502
00503
00504
00505 setTsys2(record);
00506
00507
00508 setTcal2(record);
00509
00510 record.num_pol = 2;
00511 record.corr_type[0] = corr_type_[0];
00512 record.corr_type[1] = corr_type_[3];
00513 } else if (isSinglePol0()) {
00514
00515
00516 record.setFloat();
00517 record.setDataSize(1, num_chan_);
00518 transposeMatrix<Float, ExecuteMatrix1>(num_chan_, 0, data_,
00519 record.float_data);
00520 transposeMatrix<Bool, ExecuteMatrix1>(num_chan_, 0, flag_, record.flag);
00521 record.flag_row = flag_row_(0);
00522
00523 setTsys1(0, record);
00524
00525
00526 setTcal1(0, record);
00527
00528 record.num_pol = 1;
00529 record.corr_type[0] = corr_type_[0];
00530 } else if (isSinglePol1()) {
00531
00532
00533 record.setFloat();
00534 record.setDataSize(1, num_chan_);
00535 transposeMatrix<Float, ExecuteMatrix1>(num_chan_, 1, data_,
00536 record.float_data);
00537 transposeMatrix<Bool, ExecuteMatrix1>(num_chan_, 1, flag_, record.flag);
00538 record.flag_row = flag_row_(1);
00539
00540 setTsys1(1, record);
00541
00542
00543 setTcal1(1, record);
00544
00545 record.num_pol = 1;
00546 record.corr_type[0] = corr_type_[3];
00547 } else {
00548
00549 return false;
00550 }
00551
00552 POST_END;
00553 return true;
00554 }
00555
00556 bool getCircular(MSDataRecord &record) {
00557 return getLinear(record);
00558 }
00559
00560 bool getStokes(MSDataRecord &record) {
00561 POST_START;
00562
00563 record.setFloat();
00564 if (isFullPol()) {
00565 record.setDataSize(4, num_chan_);
00566 transposeMatrix<Float, ExecuteMatrix4>(num_chan_, 0, data_,
00567 record.float_data);
00568 transposeMatrix<Bool, ExecuteMatrix4>(num_chan_, 0, flag_, record.flag);
00569 record.flag_row = anyTrue(flag_row_);
00570
00571 record.num_pol = 4;
00572 record.corr_type = corr_type_;
00573 } else if (isSinglePol0()) {
00574 record.setDataSize(1, num_chan_);
00575 transposeMatrix<Float, ExecuteMatrix1>(num_chan_, 0, data_,
00576 record.float_data);
00577 transposeMatrix<Bool, ExecuteMatrix1>(num_chan_, 0, flag_, record.flag);
00578 record.flag_row = flag_row_[0];
00579
00580 record.num_pol = 1;
00581 record.corr_type[0] = corr_type_[0];
00582 } else {
00583 return false;
00584 }
00585
00586 POST_END;
00587 return true;
00588 }
00589
00590 bool getLinpol(MSDataRecord &record) {
00591 POST_START;
00592
00593 record.setFloat();
00594 if (isDualPol()) {
00595
00596 record.setDataSize(2, num_chan_);
00597 transposeMatrix<Float, ExecuteMatrix2>(num_chan_, 0, data_,
00598 record.float_data);
00599 transposeMatrix<Bool, ExecuteMatrix2>(num_chan_, 0, flag_, record.flag);
00600 record.flag_row = flag_row_[0] || flag_row_[1];
00601
00602 record.num_pol = 2;
00603 record.corr_type = corr_type_;
00604 } else if (isSinglePol0()) {
00605 record.setDataSize(1, num_chan_);
00606 transposeMatrix<Float, ExecuteMatrix1>(num_chan_, 0, data_,
00607 record.float_data);
00608 transposeMatrix<Bool, ExecuteMatrix1>(num_chan_, 0, flag_, record.flag);
00609 record.flag_row = flag_row_[0];
00610
00611 record.num_pol = 1;
00612 record.corr_type[0] = corr_type_[0];
00613 } else {
00614 return false;
00615 }
00616
00617 POST_END;
00618 return true;
00619 }
00620
00621 uInt getNumPolLinear() const {
00622 if (isFullPol()) {
00623 return 4;
00624 } else if (isDualPol()) {
00625 return 2;
00626 } else if (isSinglePol0() || isSinglePol1()) {
00627 return 1;
00628 } else {
00629 return 0;
00630 }
00631 }
00632
00633 uInt getNumPolCircular() const {
00634 return getNumPolLinear();
00635 }
00636
00637 uInt getNumPolStokes() const {
00638 if (isFullPol()) {
00639 return 4;
00640 } else if (isSinglePol0()) {
00641 return 1;
00642 } else {
00643 return 0;
00644 }
00645 }
00646
00647 uInt getNumPolLinpol() const {
00648 if (isDualPol()) {
00649 return 2;
00650 } else if (isSinglePol0()) {
00651 return 1;
00652 } else {
00653 return 0;
00654 }
00655 }
00656 };
00657
00658 class DataAccumulator {
00659 private:
00660 struct DataAccumulatorKey {
00661 Int antenna_id;
00662 Int field_id;
00663 Int feed_id;
00664 Int spw_id;
00665 String pol_type;
00666 String intent;
00667
00668 template<class T, class C>
00669 bool comp(T const &a, T const &b, C const &c) const {
00670 if (a < b) {
00671 return true;
00672 } else if (a == b) {
00673 return c();
00674 } else {
00675 return false;
00676 }
00677 }
00678
00679 bool operator()(DataAccumulatorKey const &lhs,
00680 DataAccumulatorKey const &rhs) const {
00681 return comp(lhs.antenna_id, rhs.antenna_id,
00682 [&]() {return comp(lhs.field_id, rhs.field_id,
00683 [&]() {return comp(lhs.feed_id, rhs.feed_id,
00684 [&]() {return comp(lhs.spw_id, rhs.spw_id,
00685 [&]() {return comp(lhs.pol_type, rhs.pol_type,
00686 [&]() {return comp(lhs.intent, rhs.intent,
00687 []() {return false;});});});});});});
00688 }
00689 };
00690
00691 public:
00692 DataAccumulator() :
00693 pool_(), antenna_id_(), spw_id_(), field_id_(), feed_id_(), scan_(),
00694 subscan_(), intent_(), direction_(), interval_(), indexer_(), time_(-1.0),
00695 is_free_() {
00696 }
00697
00698 virtual ~DataAccumulator() {
00699 POST_START;
00700
00701 for (auto iter = pool_.begin(); iter != pool_.end(); ++iter) {
00702 delete *iter;
00703 }
00704
00705 POST_END;
00706 }
00707
00708 size_t getNumberOfChunks() const {
00709 return pool_.size();
00710 }
00711
00712 size_t getNumberOfActiveChunks() const {
00713 return std::count_if(pool_.begin(), pool_.end(), [](DataChunk * const &c) {
00714 return c->getNumPol() > 0;
00715 });
00716 }
00717
00718 bool queryForGet(DataRecord const &record) const {
00719 Double const time = record.time;
00720 bool is_ready = (0.0 <= time_) && !(time_ == time);
00721 return is_ready;
00722 }
00723
00724 bool queryForGet(Double const &time) const {
00725 bool is_ready = (0.0 <= time_) && !(time_ == time);
00726 return is_ready;
00727 }
00728
00729 void clear() {
00730 for (auto iter = pool_.begin(); iter != pool_.end(); ++iter) {
00731 (*iter)->clear();
00732 }
00733 time_ = -1.0;
00734 }
00735
00736 bool get(size_t ichunk, MSDataRecord &record) {
00737 POST_START;
00738
00739 if (pool_.size() == 0) {
00740 return false;
00741 } else if (ichunk >= pool_.size()) {
00742 return false;
00743 }
00744 bool status = pool_[ichunk]->get(record);
00745
00746 if (!status) {
00747 record.clear();
00748 return status;
00749 }
00750 record.time = time_;
00751 record.pol_type = pool_[ichunk]->getPolType();
00752 record.antenna_id = antenna_id_[ichunk];
00753 record.spw_id = spw_id_[ichunk];
00754 record.field_id = field_id_[ichunk];
00755 record.feed_id = feed_id_[ichunk];
00756 record.scan = scan_[ichunk];
00757 record.subscan = subscan_[ichunk];
00758 record.intent = intent_[ichunk];
00759 record.direction = direction_[ichunk];
00760 record.interval = interval_[ichunk];
00761 record.temperature = temperature_[ichunk];
00762 record.pressure = pressure_[ichunk];
00763 record.rel_humidity = rel_humidity_[ichunk];
00764 record.wind_speed = wind_speed_[ichunk];
00765 record.wind_direction = wind_direction_[ichunk];
00766
00767 POST_END;
00768 return status;
00769 }
00770
00771 bool accumulate(DataRecord const &record) {
00772 POST_START;
00773
00774 if (!isValidRecord(record)) {
00775
00776 return false;
00777 }
00778
00779 Double time = record.time;
00780 if (time_ < 0.0) {
00781 time_ = time;
00782 }
00783 if (time_ != time) {
00784
00785 return false;
00786 }
00787 Int antennaid = record.antenna_id;
00788 Int spwid = record.spw_id;
00789 Int fieldid = record.field_id;
00790 Int feedid = record.feed_id;
00791 Int scan = record.scan;
00792 Int subscan = record.subscan;
00793 String intent = record.intent;
00794 String poltype = record.pol_type;
00795 DataAccumulatorKey key;
00796 key.antenna_id = record.antenna_id;
00797 key.field_id = record.field_id;
00798 key.feed_id = record.feed_id;
00799 key.spw_id = record.spw_id;
00800 key.intent = record.intent;
00801 key.pol_type = record.pol_type;
00802 Matrix<Double> const &direction = record.direction;
00803 Double interval = record.interval;
00804 Float temperature = record.temperature;
00805 Float pressure = record.pressure;
00806 Float rel_humidity = record.rel_humidity;
00807 Float wind_speed = record.wind_speed;
00808 Float wind_direction = record.wind_direction;
00809 bool status = false;
00810 auto iter = indexer_.find(key);
00811 if (iter != indexer_.end()) {
00812 uInt index = iter->second;
00813 status = pool_[index]->accumulate(record);
00814 if (status) {
00815 antenna_id_[index] = antennaid;
00816 spw_id_[index] = spwid;
00817 field_id_[index] = fieldid;
00818 feed_id_[index] = feedid;
00819 scan_[index] = scan;
00820 subscan_[index] = subscan;
00821 intent_[index] = intent;
00822 direction_[index].assign(direction);
00823 interval_[index] = interval;
00824 temperature_[index] = temperature;
00825 pressure_[index] = pressure;
00826 rel_humidity_[index] = rel_humidity;
00827 wind_speed_[index] = wind_speed;
00828 wind_direction_[index] = wind_direction;
00829 }
00830 } else {
00831 pool_.push_back(new DataChunk(poltype));
00832 antenna_id_.push_back(-1);
00833 spw_id_.push_back(-1);
00834 field_id_.push_back(-1);
00835 feed_id_.push_back(-1);
00836 scan_.push_back(-1);
00837 subscan_.push_back(-1);
00838 intent_.push_back("");
00839 direction_.push_back(Matrix<Double>());
00840 interval_.push_back(-1.0);
00841 temperature_.push_back(0.0f);
00842 pressure_.push_back(0.0f);
00843 rel_humidity_.push_back(0.0f);
00844 wind_speed_.push_back(0.0f);
00845 wind_direction_.push_back(0.0f);
00846 uInt index = pool_.size() - 1;
00847 indexer_[key] = index;
00848 status = pool_[index]->accumulate(record);
00849 if (status) {
00850 antenna_id_[index] = antennaid;
00851 spw_id_[index] = spwid;
00852 field_id_[index] = fieldid;
00853 feed_id_[index] = feedid;
00854 scan_[index] = scan;
00855 subscan_[index] = subscan;
00856 intent_[index] = intent;
00857 direction_[index].assign(direction);
00858 interval_[index] = interval;
00859 temperature_[index] = temperature;
00860 pressure_[index] = pressure;
00861 rel_humidity_[index] = rel_humidity;
00862 wind_speed_[index] = wind_speed;
00863 wind_direction_[index] = wind_direction;
00864 }
00865 }
00866
00867
00868
00869
00870
00871
00872 POST_END;
00873 return status;
00874 }
00875
00876 String getPolType(size_t ichunk) const {
00877 assert(ichunk < pool_.size());
00878 return pool_[ichunk]->getPolType();
00879 }
00880
00881 uInt getNumPol(size_t ichunk) const {
00882 assert(ichunk < pool_.size());
00883 return pool_[ichunk]->getNumPol();
00884 }
00885
00886 private:
00887 bool isValidRecord(DataRecord const &record) {
00888
00889
00890
00891
00892 return record.time > 0.0 && record.interval > 0.0 && record.antenna_id >= 0
00893 && record.field_id >= 0 && record.feed_id >= 0 && record.spw_id >= 0
00894 && record.scan >= 0 && record.subscan >= 0 && !record.direction.empty();
00895 }
00896
00897 std::vector<DataChunk *> pool_;
00898 std::vector<Int> antenna_id_;
00899 std::vector<Int> spw_id_;
00900 std::vector<Int> field_id_;
00901 std::vector<Int> feed_id_;
00902 std::vector<Int> scan_;
00903 std::vector<Int> subscan_;
00904 std::vector<String> intent_;
00905 std::vector<Matrix<Double> > direction_;
00906 std::vector<Double> interval_;
00907 std::vector<Float> temperature_;
00908 std::vector<Float> pressure_;
00909 std::vector<Float> rel_humidity_;
00910 std::vector<Float> wind_speed_;
00911 std::vector<Float> wind_direction_;
00912 std::map<DataAccumulatorKey, uInt, DataAccumulatorKey> indexer_;
00913 Double time_;
00914 std::vector<bool> is_free_;
00915 };
00916
00917 }
00918 }
00919
00920 #endif