ContinuumPartitionMixin.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #ifndef CONTINUUM_PARTITION_MIXIN_H_
00028 #define CONTINUUM_PARTITION_MIXIN_H_
00029
00030 #include <synthesis/ImagerObjects/SynthesisUtilMethods.h>
00031 #include <synthesis/ImagerObjects/ParallelImagerParams.h>
00032 #include <synthesis/ImagerObjects/MultiParamFieldIterator.h>
00033 #include <synthesis/ImagerObjects/MPIGlue.h>
00034 #include <algorithm>
00035 #include <unistd.h>
00036 #include <vector>
00037 #include <string>
00038
00039 namespace casa {
00040
00045 template <class T>
00046 class ContinuumPartitionMixin
00047 : public T {
00048
00049 public:
00050 void concat_images(const std::string &type __attribute__((unused))) {};
00051
00052 protected:
00053 MPI_Comm worker_comm;
00054
00055 int num_workers;
00056
00057 int worker_rank;
00058
00059 ParallelImagerParams
00060 get_params(MPI_Comm wcomm, ParallelImagerParams &initial) {
00061
00062
00063 worker_comm = wcomm;
00064 if (worker_comm != MPI_COMM_NULL) {
00065 MPI_Comm_size(worker_comm, &num_workers);
00066 MPI_Comm_rank(worker_comm, &worker_rank);
00067 } else {
00068 num_workers = 0;
00069 worker_rank = -1;
00070 }
00071
00072 std::string cwd(getcwd(nullptr, 0));
00073 std::vector<std::string> all_worker_suffixes;
00074 for (int r = 0; r < num_workers; ++r)
00075 all_worker_suffixes.push_back(".n" + std::to_string(r));
00076 std::string my_worker_suffix =
00077 ((num_workers > 1 && worker_rank >= 0)
00078 ? all_worker_suffixes[worker_rank]
00079 : "");
00080 SynthesisUtilMethods util;
00081 ParallelImagerParams result;
00082
00083
00084 result.selection =
00085 ((worker_rank >= 0)
00086 ? util.continuumDataPartition(initial.selection, num_workers).
00087 rwSubRecord(std::to_string(worker_rank))
00088 : Record());
00089
00090
00091 if (worker_rank >= 0) {
00092 auto modify_imagename = [&](const char *field_val) {
00093 return cwd + "/" + field_val + my_worker_suffix;
00094 };
00095 result.image = convert_fields(initial.image, "imagename",
00096 modify_imagename);
00097 } else {
00098 result.image = empty_fields(initial.image, "imagename");
00099 }
00100
00101
00102 if (worker_rank >= 0) {
00103 auto modify_cfcache = [&](const char *field_val) {
00104 return field_val + my_worker_suffix;
00105 };
00106 result.grid =
00107 convert_fields(initial.grid, "cfcache", modify_cfcache);
00108 } else {
00109 result.grid = empty_fields(initial.grid, "cfcache");
00110 }
00111
00112
00113 if (worker_rank == 0 && num_workers > 1) {
00114 auto accumulate_part_names =
00115 [&] (std::array<Record *,2> im_norm_par) {
00116 vector<String> part_names;
00117 std::string image_path =
00118 cwd + "/" + im_norm_par[0]->asString("imagename").c_str();
00119 for (auto s : all_worker_suffixes) {
00120 part_names.push_back(String(image_path + s));
00121 }
00122 im_norm_par[1]->define("partimagenames", Vector<String>(part_names));
00123 };
00124 std::array<Record *,2> im_norm_params =
00125 { &initial.image, &initial.normalization };
00126 std::for_each(MultiParamFieldIterator<2>::begin(im_norm_params),
00127 MultiParamFieldIterator<2>::end(im_norm_params),
00128 accumulate_part_names);
00129 result.normalization = *(im_norm_params[1]);
00130 } else {
00131 result.normalization =
00132 empty_fields(initial.normalization, "partimagenames");
00133 }
00134
00135
00136 result.deconvolution =
00137 ((worker_rank == 0) ? initial.deconvolution : Record());
00138
00139
00140 result.weight = ((worker_rank >= 0) ? initial.weight : Record());
00141
00142
00143 result.iteration = initial.iteration;
00144
00145 return result;
00146 }
00147
00148 private:
00149
00150
00151 Record convert_fields(Record &rec, const char *field,
00152 std::function<std::string(const char *)> fn) {
00153 auto modify_field_val = [&](Record &msRec) {
00154 msRec.define(field, fn(msRec.asString(field).c_str()));
00155 };
00156 Record result(rec);
00157 std::for_each(ParamFieldIterator::begin(&result),
00158 ParamFieldIterator::end(&result),
00159 modify_field_val);
00160 return result;
00161 }
00162
00163
00164 Record empty_fields(Record &rec, const char *field) {
00165 auto modify_field_val = [&](Record &msRec) {
00166 msRec.defineRecord(field, Record());
00167 };
00168 Record result(rec);
00169 std::for_each(ParamFieldIterator::begin(&result),
00170 ParamFieldIterator::end(&result),
00171 modify_field_val);
00172 return result;
00173 }
00174 };
00175
00176 }
00177
00178 #endif // CONTINUUM_PARTITION_MIXIN_H_