CubePartitionMixin.h

Go to the documentation of this file.
00001 /* -*- mode: c++ -*- */
00002 //# CubePartitionMixin.h: Parallel cube imaging data partitioning
00003 //# Copyright (C) 2016
00004 //# Associated Universities, Inc. Washington DC, USA.
00005 //#
00006 //# This library is free software; you can redistribute it and/or modify it
00007 //# under the terms of the GNU Library General Public License as published by
00008 //# the Free Software Foundation; either version 2 of the License, or (at your
00009 //# option) any later version.
00010 //#
00011 //# This library is distributed in the hope that it will be useful, but WITHOUT
00012 //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00013 //# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00014 //# License for more details.
00015 //#
00016 //# You should have received a copy of the GNU Library General Public License
00017 //# along with this library; if not, write to the Free Software Foundation,
00018 //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
00019 //#
00020 //# Correspondence concerning AIPS++ should be addressed as follows:
00021 //#        Internet email: aips2-request@nrao.edu.
00022 //#        Postal address: AIPS++ Project Office
00023 //#                        National Radio Astronomy Observatory
00024 //#                        520 Edgemont Road
00025 //#                        Charlottesville, VA 22903-2475 USA
00026 //#
00027 #ifndef CUBE_PARTITION_MIXIN_H_
00028 #define CUBE_PARTITION_MIXIN_H_
00029 
00030 #include <synthesis/ImagerObjects/SynthesisUtilMethods.h>
00031 #include <synthesis/ImagerObjects/ParallelImagerParams.h>
00032 #include <synthesis/ImagerObjects/ParamFieldIterator.h>
00033 #include <synthesis/ImagerObjects/MPIGlue.h>
00034 #include <algorithm>
00035 #include <vector>
00036 #include <string>
00037 #include <unistd.h>
00038 
00039 namespace casa {
00040 
00045 template <class T>
00046 class CubePartitionMixin
00047         : public T {
00048 
00049 public:
00050         void concat_images(const string &type) {
00051                 LogIO log(LogOrigin("CubePartitionMixin", "concat_images", WHERE));
00052                 if (worker_rank >= 0) {
00053                         const std::string image_types[] =
00054                                 {"image", "psf", "model", "residual", "mask", "pb", "weight"};
00055                         string cwd(getcwd(nullptr, 0));
00056                         // wait until all ranks have completed file modifications
00057                         MPI_Barrier(worker_comm);
00058                         // round-robin allocation of image concatenation tasks to worker
00059                         // ranks
00060                         for (uInt i = (uInt)worker_rank;
00061                              i < image_parameters.nfields();
00062                              i += (uInt)num_workers) {
00063                                 std::string imagename =
00064                                         image_parameters.subRecord(i).asString("imagename");
00065                                 std::string image_prefix = cwd + "/" + imagename;
00066                                 std::vector<std::string> images;
00067                                 for (auto ext : image_types) {
00068                                         images.clear();
00069                                         std::string ext_suffix = "." + ext;
00070                                         std::string concat_imagename =
00071                                                 image_prefix + ext_suffix;
00072                                         for (uInt j = 0; j < (uInt)num_workers; ++j) {
00073                                                 std::string component_imagename =
00074                                                         image_prefix + ".n" + std::to_string(j) + ext_suffix;
00075                                                 if (access(component_imagename.c_str(), F_OK) == 0)
00076                                                         images.push_back(component_imagename);
00077                                         }
00078                                         if (!images.empty()) {
00079                                                 std::string cmd = "imageconcat inimages='";
00080                                                 for (auto im : images) cmd += im + " ";
00081                                                 cmd += "' outimage='" + concat_imagename + "' type=";
00082                                                 cmd += type;
00083                                                 int rc = std::system(cmd.c_str());
00084                                                 if (rc == -1 || WEXITSTATUS(rc) != 0)
00085                                                         log << LogIO::WARN
00086                                                             << ("concatenation of " + concat_imagename
00087                                                                 + " failed")
00088                                                             << LogIO::POST;
00089                                         }
00090                                 }
00091                         }
00092                 }
00093         };
00094 
00095 protected:
00096 
00097         MPI_Comm worker_comm;
00098 
00099         int num_workers;
00100 
00101         int worker_rank;
00102 
00103         Record image_parameters;
00104 
00105         ParallelImagerParams
00106         get_params(MPI_Comm wcomm, ParallelImagerParams &initial) {
00107 
00108                 // Determine rank among parallel imaging worker processes.
00109                 worker_comm = wcomm;
00110                 if (worker_comm != MPI_COMM_NULL) {
00111                         MPI_Comm_size(worker_comm, &num_workers);
00112                         MPI_Comm_rank(worker_comm, &worker_rank);
00113                 } else {
00114                         num_workers = 0;
00115                         worker_rank = -1;
00116                 }
00117 
00118                 std::string cwd(getcwd(nullptr, 0));
00119                 std::vector<std::string> all_worker_suffixes;
00120                 for (int r = 0; r < num_workers; ++r) {
00121                         all_worker_suffixes.push_back(".n" + std::to_string(r));
00122                 }
00123                 std::string my_worker_suffix =
00124                         ((worker_rank >= 0) ? all_worker_suffixes[worker_rank] : "");
00125 
00126                 image_parameters = Record(initial.image);
00127 
00128                 SynthesisUtilMethods util;
00129                 ParallelImagerParams result;
00130 
00131                 Record data_image_partition;
00132                 if (worker_rank >= 0) {
00133                         std::string worker_rank_str = std::to_string(worker_rank);
00134                         // need a SynthesisImager instance to do cube partitioning
00135                         std::unique_ptr<SynthesisImager> si(new SynthesisImager());
00136                         for (uInt i = 0; i < initial.selection.nfields(); ++i) {
00137                                 SynthesisParamsSelect selection_pars;
00138                                 selection_pars.fromRecord(initial.selection.subRecord(i));
00139                                 si->selectData(selection_pars);
00140                         }
00141                         for (uInt i = 0; i < initial.image.nfields(); ++i) {
00142                                 String i_name = initial.image.name(i);
00143                                 if (initial.grid.isDefined(i_name)) {
00144                                         SynthesisParamsImage image_pars;
00145                                         image_pars.fromRecord(initial.image.subRecord(i_name));
00146                                         SynthesisParamsGrid grid_pars;
00147                                         grid_pars.fromRecord(initial.grid.subRecord(i_name));
00148                                         si->defineImage(image_pars, grid_pars);
00149                                         Record csys = si->getcsys();
00150                                         if (csys.nfields() > 0) {
00151                                                 int nchan = ((image_pars.nchan == -1)
00152                                                              ? si->updateNchan()
00153                                                              : image_pars.nchan);;
00154                                                 Vector<Int> numchans;
00155                                                 Vector<CoordinateSystem> csystems;
00156                                                 // save only that part of the record returned from
00157                                                 // util.cubeDataImagePartition that is handled by the
00158                                                 // current rank
00159                                                 data_image_partition.defineRecord(
00160                                                         i_name,
00161                                                         util.cubeDataImagePartition(
00162                                                                 initial.selection,
00163                                                                 *CoordinateSystem::restore(csys, "coordsys"),
00164                                                                 num_workers, nchan, csystems, numchans).
00165                                                         rwSubRecord(worker_rank_str));
00166                                         }
00167                                 }
00168                         }
00169                 }
00170 
00171                 // selection_params
00172                 if (worker_rank >= 0) {
00173                         Record sel;
00174                         for (uInt i = 0; i < data_image_partition.nfields(); ++i) {
00175                                 Record &di = data_image_partition.rwSubRecord(i);
00176                                 for (uInt f = 0; f < di.nfields(); ++f) {
00177                                         String name = di.name(f);
00178                                         if (name.find("ms") == 0) {
00179                                                 Record &ms = di.rwSubRecord(f);
00180                                                 if (ms.isDefined("spw") && ms.asString("spw") == "-1")
00181                                                         ms.define("spw", "");
00182                                                 sel.defineRecord(name, ms);
00183                                         }
00184                                 }
00185                         }
00186                         result.selection = sel;
00187                 } else {
00188                         result.selection = Record();
00189                 }
00190 
00191                 // image_params
00192                 if (worker_rank >= 0) {
00193                         result.image = initial.image;
00194                         for (uInt i = 0; i < data_image_partition.nfields(); ++i) {
00195                                 const Record &di = data_image_partition.subRecord(i);
00196                                 String i_name = data_image_partition.name(i);
00197                                 Record &i_image = result.image.rwSubRecord(i_name);
00198                                 i_image.define("csys", di.asString("coordsys"));
00199                                 i_image.define("nchan", di.asString("nchan"));
00200                                 i_image.define(
00201                                         "imagename",
00202                                         i_image.asString("imagename") + String(my_worker_suffix));
00203                         }
00204                 } else {
00205                         result.image = empty_fields(initial.image, "imagename");
00206                 }
00207 
00208                 // FIXME: are grid parameters partitioned by node?
00209                 //
00210                 // grid params
00211                 if (worker_rank >= 0) {
00212                         auto modify_cfcache = [&](const char *field_val) {
00213                                 return *field_val + my_worker_suffix;
00214                         };
00215                         result.grid =
00216                                 convert_fields(initial.grid, "cfcache", modify_cfcache);
00217                 } else {
00218                         result.grid = empty_fields(initial.grid, "cfcache");
00219                 }
00220 
00221                 // normalization_params
00222                 result.normalization =
00223                         ((worker_rank >= 0) ? initial.normalization : Record());
00224 
00225                 // deconvolution params
00226                 result.deconvolution =
00227                         ((worker_rank >= 0) ? initial.deconvolution : Record());
00228 
00229                 // weight params
00230                 result.weight =
00231                         ((worker_rank >= 0) ? initial.weight : Record());
00232 
00233                 // iteration params
00234                 result.iteration = initial.iteration;
00235 
00236                 return result;
00237         }
00238 
00239 private:
00240 
00241         // Convenience method to transform certain record fields
00242         Record convert_fields(Record &rec, const char *field,
00243                               std::function<std::string(const char *)> fn) {
00244                 auto modify_field_val = [&](Record &msRec) {
00245                         msRec.define(field, fn(msRec.asString(field).c_str()));
00246                 };
00247                 Record result(rec);
00248                 for_each(ParamFieldIterator::begin(&result),
00249                          ParamFieldIterator::end(&result),
00250                          modify_field_val);
00251                 return result;
00252         }
00253 
00254         // Convenience method to clear certain record fields
00255         Record empty_fields(Record &rec, const char *field) {
00256                 auto modify_field_val = [&](Record &msRec) {
00257                         msRec.defineRecord(field, Record());
00258                 };
00259                 Record result(rec);
00260                 for_each(ParamFieldIterator::begin(&result),
00261                          ParamFieldIterator::end(&result),
00262                          modify_field_val);
00263                 return result;
00264         }
00265 };
00266 
00267 } // namespace casa
00268 
00269 #endif // CUBE_PARTITION_MIXIN_H_
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 31 Aug 2016 for casa by  doxygen 1.6.1