CubePartitionMixin.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #ifndef CUBE_PARTITION_MIXIN_H_
00028 #define CUBE_PARTITION_MIXIN_H_
00029
00030 #include <synthesis/ImagerObjects/SynthesisUtilMethods.h>
00031 #include <synthesis/ImagerObjects/ParallelImagerParams.h>
00032 #include <synthesis/ImagerObjects/ParamFieldIterator.h>
00033 #include <synthesis/ImagerObjects/MPIGlue.h>
00034 #include <algorithm>
00035 #include <vector>
00036 #include <string>
00037 #include <unistd.h>
00038
00039 namespace casa {
00040
00045 template <class T>
00046 class CubePartitionMixin
00047 : public T {
00048
00049 public:
00050 void concat_images(const string &type) {
00051 LogIO log(LogOrigin("CubePartitionMixin", "concat_images", WHERE));
00052 if (worker_rank >= 0) {
00053 const std::string image_types[] =
00054 {"image", "psf", "model", "residual", "mask", "pb", "weight"};
00055 string cwd(getcwd(nullptr, 0));
00056
00057 MPI_Barrier(worker_comm);
00058
00059
00060 for (uInt i = (uInt)worker_rank;
00061 i < image_parameters.nfields();
00062 i += (uInt)num_workers) {
00063 std::string imagename =
00064 image_parameters.subRecord(i).asString("imagename");
00065 std::string image_prefix = cwd + "/" + imagename;
00066 std::vector<std::string> images;
00067 for (auto ext : image_types) {
00068 images.clear();
00069 std::string ext_suffix = "." + ext;
00070 std::string concat_imagename =
00071 image_prefix + ext_suffix;
00072 for (uInt j = 0; j < (uInt)num_workers; ++j) {
00073 std::string component_imagename =
00074 image_prefix + ".n" + std::to_string(j) + ext_suffix;
00075 if (access(component_imagename.c_str(), F_OK) == 0)
00076 images.push_back(component_imagename);
00077 }
00078 if (!images.empty()) {
00079 std::string cmd = "imageconcat inimages='";
00080 for (auto im : images) cmd += im + " ";
00081 cmd += "' outimage='" + concat_imagename + "' type=";
00082 cmd += type;
00083 int rc = std::system(cmd.c_str());
00084 if (rc == -1 || WEXITSTATUS(rc) != 0)
00085 log << LogIO::WARN
00086 << ("concatenation of " + concat_imagename
00087 + " failed")
00088 << LogIO::POST;
00089 }
00090 }
00091 }
00092 }
00093 };
00094
00095 protected:
00096
00097 MPI_Comm worker_comm;
00098
00099 int num_workers;
00100
00101 int worker_rank;
00102
00103 Record image_parameters;
00104
00105 ParallelImagerParams
00106 get_params(MPI_Comm wcomm, ParallelImagerParams &initial) {
00107
00108
00109 worker_comm = wcomm;
00110 if (worker_comm != MPI_COMM_NULL) {
00111 MPI_Comm_size(worker_comm, &num_workers);
00112 MPI_Comm_rank(worker_comm, &worker_rank);
00113 } else {
00114 num_workers = 0;
00115 worker_rank = -1;
00116 }
00117
00118 std::string cwd(getcwd(nullptr, 0));
00119 std::vector<std::string> all_worker_suffixes;
00120 for (int r = 0; r < num_workers; ++r) {
00121 all_worker_suffixes.push_back(".n" + std::to_string(r));
00122 }
00123 std::string my_worker_suffix =
00124 ((worker_rank >= 0) ? all_worker_suffixes[worker_rank] : "");
00125
00126 image_parameters = Record(initial.image);
00127
00128 SynthesisUtilMethods util;
00129 ParallelImagerParams result;
00130
00131 Record data_image_partition;
00132 if (worker_rank >= 0) {
00133 std::string worker_rank_str = std::to_string(worker_rank);
00134
00135 std::unique_ptr<SynthesisImager> si(new SynthesisImager());
00136 for (uInt i = 0; i < initial.selection.nfields(); ++i) {
00137 SynthesisParamsSelect selection_pars;
00138 selection_pars.fromRecord(initial.selection.subRecord(i));
00139 si->selectData(selection_pars);
00140 }
00141 for (uInt i = 0; i < initial.image.nfields(); ++i) {
00142 String i_name = initial.image.name(i);
00143 if (initial.grid.isDefined(i_name)) {
00144 SynthesisParamsImage image_pars;
00145 image_pars.fromRecord(initial.image.subRecord(i_name));
00146 SynthesisParamsGrid grid_pars;
00147 grid_pars.fromRecord(initial.grid.subRecord(i_name));
00148 si->defineImage(image_pars, grid_pars);
00149 Record csys = si->getcsys();
00150 if (csys.nfields() > 0) {
00151 int nchan = ((image_pars.nchan == -1)
00152 ? si->updateNchan()
00153 : image_pars.nchan);;
00154 Vector<Int> numchans;
00155 Vector<CoordinateSystem> csystems;
00156
00157
00158
00159 data_image_partition.defineRecord(
00160 i_name,
00161 util.cubeDataImagePartition(
00162 initial.selection,
00163 *CoordinateSystem::restore(csys, "coordsys"),
00164 num_workers, nchan, csystems, numchans).
00165 rwSubRecord(worker_rank_str));
00166 }
00167 }
00168 }
00169 }
00170
00171
00172 if (worker_rank >= 0) {
00173 Record sel;
00174 for (uInt i = 0; i < data_image_partition.nfields(); ++i) {
00175 Record &di = data_image_partition.rwSubRecord(i);
00176 for (uInt f = 0; f < di.nfields(); ++f) {
00177 String name = di.name(f);
00178 if (name.find("ms") == 0) {
00179 Record &ms = di.rwSubRecord(f);
00180 if (ms.isDefined("spw") && ms.asString("spw") == "-1")
00181 ms.define("spw", "");
00182 sel.defineRecord(name, ms);
00183 }
00184 }
00185 }
00186 result.selection = sel;
00187 } else {
00188 result.selection = Record();
00189 }
00190
00191
00192 if (worker_rank >= 0) {
00193 result.image = initial.image;
00194 for (uInt i = 0; i < data_image_partition.nfields(); ++i) {
00195 const Record &di = data_image_partition.subRecord(i);
00196 String i_name = data_image_partition.name(i);
00197 Record &i_image = result.image.rwSubRecord(i_name);
00198 i_image.define("csys", di.asString("coordsys"));
00199 i_image.define("nchan", di.asString("nchan"));
00200 i_image.define(
00201 "imagename",
00202 i_image.asString("imagename") + String(my_worker_suffix));
00203 }
00204 } else {
00205 result.image = empty_fields(initial.image, "imagename");
00206 }
00207
00208
00209
00210
00211 if (worker_rank >= 0) {
00212 auto modify_cfcache = [&](const char *field_val) {
00213 return *field_val + my_worker_suffix;
00214 };
00215 result.grid =
00216 convert_fields(initial.grid, "cfcache", modify_cfcache);
00217 } else {
00218 result.grid = empty_fields(initial.grid, "cfcache");
00219 }
00220
00221
00222 result.normalization =
00223 ((worker_rank >= 0) ? initial.normalization : Record());
00224
00225
00226 result.deconvolution =
00227 ((worker_rank >= 0) ? initial.deconvolution : Record());
00228
00229
00230 result.weight =
00231 ((worker_rank >= 0) ? initial.weight : Record());
00232
00233
00234 result.iteration = initial.iteration;
00235
00236 return result;
00237 }
00238
00239 private:
00240
00241
00242 Record convert_fields(Record &rec, const char *field,
00243 std::function<std::string(const char *)> fn) {
00244 auto modify_field_val = [&](Record &msRec) {
00245 msRec.define(field, fn(msRec.asString(field).c_str()));
00246 };
00247 Record result(rec);
00248 for_each(ParamFieldIterator::begin(&result),
00249 ParamFieldIterator::end(&result),
00250 modify_field_val);
00251 return result;
00252 }
00253
00254
00255 Record empty_fields(Record &rec, const char *field) {
00256 auto modify_field_val = [&](Record &msRec) {
00257 msRec.defineRecord(field, Record());
00258 };
00259 Record result(rec);
00260 for_each(ParamFieldIterator::begin(&result),
00261 ParamFieldIterator::end(&result),
00262 modify_field_val);
00263 return result;
00264 }
00265 };
00266
00267 }
00268
00269 #endif // CUBE_PARTITION_MIXIN_H_