gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-252-gf9a3f78
a Global And Modular Bsm Inference Tool
hdf5_combine_tools.cpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
26 
31 
32 // flag to trigger debug output
33 //#define COMBINE_DEBUG
34 
35 namespace Gambit
36 {
37  namespace Printers
38  {
39  namespace HDF5
40  {
41 
42  inline hsize_t getGroupNum(hid_t group_id)
43  {
44  H5G_info_t group_info;
45  H5Gget_info(group_id, &group_info);
46  return group_info.nlinks;
47  }
48 
49  inline hid_t getType(hid_t dataset)
50  {
51  hid_t dtype = H5Dget_type(dataset);
52  hid_t ret = H5Tget_native_type(dtype, H5T_DIR_DESCEND);
53  H5Tclose(dtype);
54  return ret;
55  }
56 
57  // Get name of ith temp file
58  std::string hdf5_stuff::get_fname(const size_t i)
59  {
60  std::stringstream ss;
61  if(custom_mode)
62  {
63  ss << file_names.at(i);
64  }
65  else
66  {
67  ss << root_file_name << "_temp_" << i;
68  }
69  return ss.str();
70  }
71 
72  // Function to discover names of datasets within a HDF5 group
73  std::vector<std::string> get_dset_names(hid_t group_id)
74  {
75  std::vector<std::string> outnames;
76  H5G_info_t group_info;
77  H5Gget_info(group_id, &group_info );
78  hsize_t nlinks = group_info.nlinks; // Number of objects in the group
79  for(hsize_t i=0; i<nlinks; i++)
80  {
81  int otype = H5Gget_objtype_by_idx(group_id, i);
82  switch (otype)
83  {
84  case H5G_DATASET:
85  {
86  char name_buffer[1000];
87  H5Gget_objname_by_idx(group_id, i, name_buffer, (size_t)1000);
88  std::string strname(name_buffer);
89  // std::cout << "Found "<<strname<<std::endl;
90  if (strname.find("_isvalid") == std::string::npos && strname != "RA_pointID" && strname != "RA_MPIrank")
91  outnames.push_back(strname);
92  break;
93  }
94  default:
95  break;
96  }
97  }
98  return outnames;
99  }
100 
101 
102  inline herr_t op_func (hid_t loc_id, const char *name_in, const H5L_info_t *,
103  void *operator_data)
104  {
105  //herr_t status;
106  herr_t return_val = 0;
107  H5O_info_t infobuf;
108  std::vector<std::string> &od = *static_cast<std::vector<std::string> *> (operator_data);
109  std::string name(name_in);
110 
111  //status = H5Oget_info_by_name (loc_id, name.c_str(), &infobuf, H5P_DEFAULT);
112  H5Oget_info_by_name (loc_id, name.c_str(), &infobuf, H5P_DEFAULT);
113 
114  switch (infobuf.type)
115  {
116  case H5O_TYPE_GROUP:
117  {
118  //printf ("Group: %s {\n", name);
119  //od.push_back(name);
120  break;
121 
122  }
123  case H5O_TYPE_DATASET:
124  {
125  //printf ("Dataset: %s\n", name);
126  std::string str(name);
127  if (name.find("_isvalid") == std::string::npos)
128  od.push_back(std::string(name));
129  break;
130  }
131  case H5O_TYPE_NAMED_DATATYPE:
132  //printf ("Datatype: %s\n", name);
133  break;
134  default:
135  break;
136  //printf ( "Unknown: %s\n", name);
137  }
138 
139  return return_val;
140  }
141 
142  inline herr_t op_func_aux (hid_t loc_id, const char *name_in, const H5L_info_t *,
143  void *operator_data)
144  {
145  //herr_t status;
146  herr_t return_val = 0;
147  H5O_info_t infobuf;
148  std::vector<std::string> &od = *static_cast<std::vector<std::string> *> (operator_data);
149  std::string name(name_in);
150 
151  //status = H5Oget_info_by_name (loc_id, name.c_str(), &infobuf, H5P_DEFAULT);
152  H5Oget_info_by_name (loc_id, name.c_str(), &infobuf, H5P_DEFAULT);
153 
154  switch (infobuf.type)
155  {
156  case H5O_TYPE_GROUP:
157  {
158  //printf ("Group: %s {\n", name);
159  //od.push_back(name);
160  break;
161 
162  }
163  case H5O_TYPE_DATASET:
164  {
165  //printf ("Dataset: %s\n", name);
166  std::string str(name);
167  if (name.find("_isvalid") == std::string::npos && name != "RA_pointID" && name != "RA_MPIrank")
168  od.push_back(std::string(name));
169  break;
170  }
171  case H5O_TYPE_NAMED_DATATYPE:
172  //printf ("Datatype: %s\n", name);
173  break;
174  default:
175  break;
176  //printf ( "Unknown: %s\n", name);
177  }
178 
179  return return_val;
180  }
181 
182  inline void setup_hdf5_points(hid_t new_group, hid_t type, hid_t type2, unsigned long long size_tot, const std::string &name)
183  {
184  #ifdef COMBINE_DEBUG
185  std::cerr << " Creating dataset '"<<name<<"'" << std::endl;
186  #endif
187 
188  hsize_t dimsf[1];
189  dimsf[0] = size_tot;
190  hid_t dataspace = H5Screate_simple(1, dimsf, NULL);
191  if(dataspace < 0)
192  {
193  std::ostringstream errmsg;
194  errmsg<<"Failed to set up HDF5 points for copying. H5Screate_simple failed for dataset ("<<name<<").";
195  printer_error().raise(LOCAL_INFO, errmsg.str());
196  }
197  hid_t dataset_out = H5Dcreate2(new_group, name.c_str(), type, dataspace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
198  if(dataset_out < 0)
199  {
200  std::ostringstream errmsg;
201  errmsg<<"Failed to set up HDF5 points for copying. H5Dcreate2 failed for dataset ("<<name<<").";
202  printer_error().raise(LOCAL_INFO, errmsg.str());
203  }
204  hid_t dataspace2 = H5Screate_simple(1, dimsf, NULL);
205  if(dataspace2 < 0)
206  {
207  std::ostringstream errmsg;
208  errmsg<<"Failed to set up HDF5 points for copying. H5Screate_simple failed for dataset ("<<name<<"_isvalid).";
209  printer_error().raise(LOCAL_INFO, errmsg.str());
210  }
211  hid_t dataset2_out = H5Dcreate2(new_group, (name + "_isvalid").c_str(), type2, dataspace2, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
212  if(dataset2_out < 0)
213  {
214  std::ostringstream errmsg;
215  errmsg<<"Failed to set up HDF5 points for copying. H5Dcreate2 failed for dataset ("<<name<<"_isvalid).";
216  printer_error().raise(LOCAL_INFO, errmsg.str());
217  }
218 
219  // We are just going to close the newly created datasets, and reopen them as needed.
220  HDF5::closeSpace(dataspace);
221  HDF5::closeSpace(dataspace2);
222  HDF5::closeDataset(dataset_out);
223  HDF5::closeDataset(dataset2_out);
224  }
225 
226  inline std::vector<std::string> getGroups(std::string groups)
227  {
228  std::string::size_type pos = groups.find_first_of("/");
229  while (pos != std::string::npos)
230  {
231  groups[pos] = ' ';
232  pos = groups.find_first_of("/", pos + 1);
233  }
234 
235  std::stringstream ss;
236  ss << groups;
237  std::vector<std::string> ret;
238  std::string temp;
239  while (ss >> temp) ret.push_back(temp);
240 
241  return ret;
242  }
243 
244  hdf5_stuff::hdf5_stuff(const std::string &file_name, const std::string &output_file, const std::string &group_name, const size_t num, const bool cleanup, const bool skip, const std::vector<std::string>& file_names_in)
245  : group_name(group_name)
246  , cum_sizes(num, 0)
247  , sizes(num, 0)
248  , size_tot(0)
249  , root_file_name(file_name)
250  , do_cleanup(cleanup)
251  , skip_unreadable(skip)
252  , files(num,-1)
253  , groups(num,-1)
254  , aux_groups(num,-1)
255  , file_names(file_names_in)
256  , custom_mode(file_names.size()>0) // Running in mode which allows 'custom' filenames (for combining output from multiple runs)
257 
258  {
259  if(custom_mode)
260  {
261  if(num!=file_names.size())
262  {
263  std::ostringstream errmsg;
264  errmsg << " Number of files to be combined (num="<<num<<") does not match length of file list supplied (file_names.size()="<<file_names.size()<<"). These should be the same!"<<std::endl;
265  printer_error().raise(LOCAL_INFO, errmsg.str());
266  }
267  if(num<2)
268  {
269  std::ostringstream errmsg;
270  errmsg << " Number of files to be combined (num="<<num<<") is less than two! Therefore there is no combining to be done!"<<std::endl;
271  printer_error().raise(LOCAL_INFO, errmsg.str());
272  }
273  std::cout << " Running combination routines in 'custom' mode. Primary datasets from all specified files (in the specified group) will be concatenated. Auxilliary (\"random access\") datasets will be IGNORED! If you need auxilliary datasets to be merged into the primary datasets then please merge them in 'normal' mode." << std::endl;
274  // TODO: It would actually be good to write out an extra dataset which records which points come from which files. Could just be an int, and could write out a txt file which gives the mapping from indices to input files. This is too much work for now though.
275  }
276 
277  // Check which files are readable (lets us tell the user all at once if there is a bad batch)
278  bool badfiles = false;
279  std::vector<std::size_t> unreadable;
280  std::cout << " Checking readability of temp files... "<<std::endl;
281  for (size_t i = 0; i < num; i++)
282  {
283  // Simple Progress monitor
284  std::string fname = get_fname(i);
285  if(not HDF5::checkFileReadable(fname))
286  {
287  badfiles = true;
288  unreadable.push_back(i);
289  }
290  }
291  if(badfiles)
292  {
293  std::cerr << " WARNING: Unreadable temporary HDF5 files detected! Indices were "<<unreadable<<std::endl;
294 
295  if(not skip_unreadable and badfiles)
296  {
297  std::ostringstream errmsg;
298  errmsg << " Unreadable temp files detected! Please set -f flag if you want to ignore them."<<std::endl;
299  printer_error().raise(LOCAL_INFO, errmsg.str());
300  }
301  }
302 
303  // Check readability of old combined file, if one is present
304  if(Utils::file_exists(output_file))
305  {
306  if(not HDF5::checkFileReadable(output_file))
307  {
308  std::ostringstream errmsg;
309  errmsg << "Error combining HDF5 temporary data! A previous combined output file was found ("<<output_file<<"), but it could not be successfully opened. It may be corrupted due to a bad shutdown. You could try deleting/moving the old combined data file and attempting the combination again, though of course the old combined data will be lost." << std::endl;
310  printer_error().raise(LOCAL_INFO, errmsg.str());
311  }
312  }
313 
314  // Loop over the temporary files from each rank and perform some setup computations.
315  for (size_t i = 0; i < num; i++)
316  {
317  // Simple Progress monitor
318  std::cout << " Scanning temp file "<<i<<" for datasets... \r"<<std::flush;
319 
320  std::string fname = get_fname(i);
321  hid_t file_id, group_id, aux_group_id;
322  hssize_t size = 0; // Length of datasets in this file (to be measured)
323  if(HDF5::checkFileReadable(fname))
324  {
325  file_id = HDF5::openFile(fname);
326  files[i] = file_id;
327  if(custom_mode) aux_group_id = -1; // Skip any aux stuff in custom mode.
328  }
329  else
330  {
331  //std::cout << " Could not open file "<<i<<" "<<std::endl;
332  file_id = -1;
333  files[i] = file_id;
334  group_id = -1;
335  aux_group_id = -1;
336  }
337 
338  if(file_id<0 and not skip)
339  {
340  // Can't read file, it is missing or corrupted, and not allowed to skip it, so error
341  std::ostringstream errmsg;
342  errmsg << "Error combining temporary HDF5 output! Failed to open expected temporary file '"<<fname<<"'.";
343  printer_error().raise(LOCAL_INFO, errmsg.str());
344  }
345 
346 
347  if(file_id >= 0 )
348  {
349  group_id = HDF5::openGroup(file_id, group_name, true); // final argument prevents group from being created
350 
351  // Extract dataset names from the group
352  //std::vector<std::string> names;
353  //H5Literate (group_id, H5_INDEX_NAME, H5_ITER_NATIVE, NULL, op_func, (void *) &names);
354 
355  std::vector<std::string> names = get_dset_names(group_id);
356 
357  // No special case needed
358  //if (i == 0)
359  //{
360  // param_names = names;
361  // param_set = std::unordered_set<std::string>(names.begin(), names.end());
362  //}
363  //else
364  //{
365  for (auto it = names.begin(), end = names.end(); it != end; ++it)
366  {
367  if (param_set.find(*it) == param_set.end())
368  {
369  // New parameter name found; add it to the list to be processed.
370  param_names.push_back(*it);
371  param_set.insert(*it);
372  }
373  }
374  //}
375 
376  if(not custom_mode)
377  {
378  // Get RA dataset names
379  aux_group_id = HDF5::openGroup(file_id, group_name+"/RA", true);
380  //std::vector<std::string> aux_names;
381  //H5Literate (aux_group_id, H5_INDEX_NAME, H5_ITER_NATIVE, NULL, op_func_aux, (void *) &aux_names);
382 
383  std::vector<std::string> aux_names = get_dset_names(aux_group_id);
384 
385  // Don't need this special case I think
386  //if (not found_valid_file) // These are the first of these names we have found
387  //{
388  // aux_param_names = aux_names;
389  // aux_param_set = std::unordered_set<std::string>(aux_names.begin(), aux_names.end());
390  //}
391  //else
392  //{
393  for (auto it = aux_names.begin(), end = aux_names.end(); it != end; ++it)
394  {
395  if (aux_param_set.find(*it) == aux_param_set.end())
396  {
397  // New aux parameter name found; add it to the list to be processed.
398  aux_param_names.push_back(*it);
399  aux_param_set.insert(*it);
400  }
401  }
402  //}
403  }
404 
405  HDF5::errorsOff();
406  hid_t dataset = HDF5::openDataset(group_id, "pointID", true); // Allow failure to open
407  hid_t dataset2 = HDF5::openDataset(group_id, "pointID_isvalid", true);
408  HDF5::errorsOn();
409  if(dataset<0 and dataset2<0)
410  {
411  // Probably the sync group is empty for this file, just skip it
412  size = 0;
413  // Just update? No reason to do only on last file I think
414  //if (i == num - 1) // Do we need this here?
415  //{
416  size_tot_l = size_tot + size; // Last?
417  //}
418  }
419  else if(dataset<0 or dataset2<0)
420  {
421  // Either primary or isvalid dataset missing (but not both). Error!
422  std::ostringstream errmsg;
423  errmsg << "Error combining temporary HDF5 output! One of the temporary files is missing either the 'pointID' or 'pointID_isvalid' dataset in the primary group (but not both, so the group is not empty!)";
424  printer_error().raise(LOCAL_INFO, errmsg.str());
425  }
426  else
427  {
428  // Everything fine, measure the dataset sizes
429  hid_t dataspace = H5Dget_space(dataset);
430  hid_t dataspace2 = H5Dget_space(dataset2);
431  size = H5Sget_simple_extent_npoints(dataspace); // Use variable declared outside the if block
432  hssize_t size2 = H5Sget_simple_extent_npoints(dataspace2);
433 
434  std::vector<bool> valids;
435  Enter_HDF5<read_hdf5>(dataset2, valids);
436 
437  // Don't seem to need this for anything?
438  //if (i == 0)
439  //{
440  // std::vector<unsigned long long> pt_id;
441  // Enter_HDF5<read_hdf5>(dataset2, pt_id);
442  // pt_min = pt_id[0];
443  //}
444 
445  if (size != size2)
446  {
447  std::ostringstream errmsg;
448  errmsg << "pointID and pointID_isvalid are not the same size.";
449  printer_error().raise(LOCAL_INFO, errmsg.str());
450  }
451 
452  // Just update? No reason to do only on last file I think
453  //if (i == num - 1)
454  //{
455  size_tot_l = size_tot + size; // Last?
456  //}
457 
458  for (auto it = valids.end()-1; size > 0; --it)
459  {
460  if (*it)
461  break;
462  else
463  --size;
464  }
465 
466  HDF5::closeSpace(dataspace);
467  HDF5::closeSpace(dataspace2);
468  HDF5::closeDataset(dataset);
469  HDF5::closeDataset(dataset2);
470  }
471  }
472 
473  for (int j = i+1, end = cum_sizes.size(); j < end; j++)
474  {
475  cum_sizes[j] += size;
476  }
477  sizes[i] = size;
478  size_tot += size;
479 
480  //std::cout <<"size_tot="<<size_tot<<", (new contribution from file "<<i<<" is "<<size<<", file_id was "<<file_id<<")"<<std::endl;
481 
482  // Record whether files/groups could be opened
483  files[i] = file_id;
484  groups[i] = group_id;
485  aux_groups[i] = aux_group_id;
486 
487  // Close groups and tmp file
488  if(group_id>-1) HDF5::closeGroup(group_id);
489  if(aux_group_id>-1) HDF5::closeGroup(aux_group_id);
490  if(file_id>-1) HDF5::closeFile(file_id);
491  }
492  std::cout << " Finished scanning temp files "<<std::endl;
493  }
494 
496  {
497  // Just in case they somehow weren't closed earlier
498  // Hmm id doesn't change when they are closed so this doesn't work...
499  //for(std::vector<hid_t>::iterator it=files.begin(); it!=files.end(); ++it)
500  //{
501  // if(*it>-1) HDF5::closeFile(*it);
502  //}
503  }
504 
505  void hdf5_stuff::Enter_Aux_Parameters(const std::string &file, bool resume)
506  {
507  std::vector<std::vector<unsigned long long>> ranks, ptids;
508  std::vector<unsigned long long> aux_sizes;
509 
510  hid_t old_file = -1;
511  hid_t old_group = -1;
512  //std::cout << "resume? " << resume <<std::endl;
513  if (resume)
514  {
515  // Check if 'file' exists?
516  if(Utils::file_exists(file))
517  {
518  std::string filebak = file + ".temp.bak";
519  std::system(("mv " + file + " " + filebak).c_str());
520  //old_file = H5Fopen((file + ".temp.bak").c_str(), H5F_ACC_RDWR, H5P_DEFAULT);
521  old_file = HDF5::openFile(file + ".temp.bak", false, 'r');
522  if(old_file<0)
523  {
524  std::ostringstream errmsg;
525  errmsg << "Error combining HDF5 temporary data! A previous combined output file was found (now moved to "<<filebak<<"), but it could not be successfully opened. It may be corrupted due to a bad shutdown. You could try deleting/moving the old combined data file and resuming again, though of course the old combined data will be lost." << std::endl;
526  printer_error().raise(LOCAL_INFO, errmsg.str());
527  }
528 
529  old_group = H5Gopen2(old_file, group_name.c_str(), H5P_DEFAULT);
530  if(old_group<0)
531  {
532  std::ostringstream errmsg;
533  errmsg << "Error combining HDF5 temporary data! A previous combined output file was found (now moved to "<<filebak<<"), but the expected dataset location "<<group_name<<" could not be successfully opened. It may be missing or corrupted due to a bad shutdown. You could try deleting/moving the old combined data file and resuming again, though of course the old combined data will be lost." << std::endl;
534  printer_error().raise(LOCAL_INFO, errmsg.str());
535  }
536 
537  hid_t old_dataset = HDF5::openDataset(old_group, "pointID", true); // Allow fail, we will throw a customised error message
538  if(old_dataset<0)
539  {
540  std::ostringstream errmsg;
541  errmsg << "Error combining HDF5 temporary data! A previous combined output file was found (now moved to "<<filebak<<"), but the 'pointID' dataset was not found in the expected location ("<<group_name<<"). It may be missing or corrupted due to a bad shutdown. You could try deleting/moving the old combined data file and resuming again, though of course the old combined data will be lost." << std::endl;
542  printer_error().raise(LOCAL_INFO, errmsg.str());
543  }
544  hid_t space = HDF5::getSpace(old_dataset);
545  hsize_t extra = HDF5::getSimpleExtentNpoints(space);
546  HDF5::closeSpace(space);
547  HDF5::closeDataset(old_dataset);
548  size_tot += extra;
549 
550  // Check for parameters not found in the newer temporary files.
551  // (should not be any aux parameters in here, so don't check for them)
552  //std::vector<std::string> names;
553  //H5Literate (old_group, H5_INDEX_NAME, H5_ITER_NATIVE, NULL, op_func, (void *) &names);
554 
555  std::vector<std::string> names = get_dset_names(old_group);
556 
557  for (auto it = names.begin(), end = names.end(); it != end; ++it)
558  {
559  if (param_set.find(*it) == param_set.end())
560  {
561  // New parameter name found; add it to the list to be processed.
562  param_names.push_back(*it);
563  param_set.insert(*it);
564  }
565  }
566  }
567  else
568  {
569  // This is ok; on first resume no previous combined output exists.
570  old_file = -1;
571  old_group = -1;
572  }
573  }
574 
575  if(size_tot==0 and aux_param_names.size()==0)
576  {
577  // If size_tot==0 then there are no primary sync datapoints in any of the temp files (or previous combined output)
578  // If there are also no auxilliary datapoints found, then there is nothing to combine! Raise an error, because
579  // this is a weird situation, and the run would effectively be starting again anyway.
580  // TODO: Keep this? Someone could restart a run from old scanner data, but just have deleted all the old
581  // printer output. Not sure if we want to allow this or not.
582  std::ostringstream errmsg;
583  errmsg << "Error combining HDF5 temporary data! No previous combined output exists (this may be the first resume attempt), but there were also no primary sync datasets found in the temporary output. This means that if some RA data also exists then there are no points available to write it into! Something has therefore gone wrong during either printing or combining stages.";
584  printer_error().raise(LOCAL_INFO, errmsg.str());
585  }
586  else if(size_tot==0 and aux_param_names.size()!=0)
587  {
588  // Ok now size_tot==0 AND there are auxillary datapoints to be combined. Currently we require primary
589  // datapoints to exist in order to write auxilliary data to them.
590  std::ostringstream errmsg;
591  errmsg << "Error combining HDF5 temporary data! No model points in primary (synchronised) datasets were found in the existing temporary files, however auxilliary datasets WERE found. Currently auxilliary datasets cannot be 'combined' with nothing, i.e. they must have a corresponding model point in a 'primary' dataset to which they should be associated. It is possible to loosen this requirement, so if you have written a scanner or likelihood container which writes only to auxillary datasets then please report this error and request the print system be upgraded. Otherwise this error indicates a bug, please report it." << std::endl;
592  printer_error().raise(LOCAL_INFO, errmsg.str());
593  }
594  // else everything is cool
595 
596  //hid_t new_file = H5Fcreate(file.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
597  hid_t new_file = HDF5::openFile(file,false,'w'); // No overwrite allowed, this file shouldn't exist
598  if(new_file<0)
599  {
600  std::ostringstream errmsg;
601  errmsg << "Failed to create output file '"<<file<<"'!";
602  printer_error().raise(LOCAL_INFO, errmsg.str());
603  }
604 
605  hid_t new_group = HDF5::openGroup(new_file, group_name); // Recursively creates required group structure
606 
607  if (aux_param_names.size() > 0) for (unsigned long i=0; i<aux_groups.size(); ++i)
608  {
609  std::vector<unsigned long long> rank, ptid;
610 
611  // Reopen temp files and reaquire group IDs
612  std::string fname = get_fname(i);
613  hid_t file_id = HDF5::openFile(fname);
614  files[i] = file_id; // keep file ID up to date
615  hid_t aux_group_id = HDF5::openGroup(file_id, group_name+"/RA", true); // final argument prevents group from being created
616  hid_t dataset;
617  if(aux_group_id < 0)
618  {
619  dataset = -1; // If group doesn't exist, dataset doesn't either
620  }
621  else
622  {
623  HDF5::errorsOff();
624  dataset = HDF5::openDataset(aux_group_id, "RA_MPIrank", true);
625  HDF5::errorsOn();
626  }
627 
628  if(dataset < 0) // If key dataset doesn't exist, set aux size to zero for this rank
629  {
630  // Need to push back empty entries, because they need to remain synced with the datasets vectors
631  ranks.push_back(rank);
632  ptids.push_back(ptid);
633  aux_sizes.push_back(0);
634  }
635  else
636  {
637  hid_t dataset2 = HDF5::openDataset(aux_group_id, "RA_pointID");
638  hid_t dataset3 = HDF5::openDataset(aux_group_id, "RA_pointID_isvalid");
639 
640  Enter_HDF5<read_hdf5>(dataset, rank);
641  Enter_HDF5<read_hdf5>(dataset2, ptid);
642  // Check that extracted rank/ptid vectors are the same length
643  if(rank.size() != ptid.size())
644  {
645  std::ostringstream errmsg;
646  errmsg << "Extracted RA_MPIrank and RA_pointID are not the same size! ("<<ranks.size()<<"!="<<ptid.size()<<")";
647  printer_error().raise(LOCAL_INFO, errmsg.str());
648  }
649 
650  hid_t dataspace = H5Dget_space(dataset2);
651  hid_t dataspace2 = H5Dget_space(dataset3);
652  hssize_t size = H5Sget_simple_extent_npoints(dataspace);
653  hssize_t size2 = H5Sget_simple_extent_npoints(dataspace2);
654 
655  std::vector<bool> valids;
656  Enter_HDF5<read_hdf5>(dataset3, valids);
657 
658  if (size != size2)
659  {
660  std::ostringstream errmsg;
661  errmsg << "RA_pointID and RA_pointID_isvalid are not the same size.";
662  printer_error().raise(LOCAL_INFO, errmsg.str());
663  }
664 
665  // Make sure to only add the isvalid points!
666  std::vector<unsigned long long> valid_rank;
667  std::vector<unsigned long long> valid_ptid;
668  for(size_t vi=0; vi<valids.size(); vi++)
669  {
670  if(valids[vi])
671  {
672  valid_rank.push_back(rank[vi]);
673  valid_ptid.push_back(ptid[vi]);
674  }
675  }
676  ranks.push_back(valid_rank);
677  ptids.push_back(valid_ptid);
678 
679  for (auto it = valids.end()-1; size > 0; --it)
680  {
681  if (*it)
682  break;
683  else
684  --size;
685  }
686  aux_sizes.push_back(size);
687 
688  HDF5::closeSpace(dataspace);
689  HDF5::closeSpace(dataspace2);
690  HDF5::closeDataset(dataset);
691  HDF5::closeDataset(dataset2);
692  HDF5::closeDataset(dataset3);
693  }
694  // Close resources
695  HDF5::closeGroup(aux_group_id);
696  HDF5::closeFile(file_id);
697  }
698 
699  // TODO: Ack, getting a "too many open files" error. I guess we need
700  // to do this in batches, but that's quite a pain to arrange.
701  const size_t BATCH_SIZE = 50;
702 
703  // Compute number of batches required
704  size_t N_BATCHES = files.size() / BATCH_SIZE;
705  size_t remainder = files.size() % BATCH_SIZE;
706  if(remainder > 0)
707  {
708  N_BATCHES += 1;
709  }
710 
711  int counter = 1;
712  for (auto it = param_names.begin(), end = param_names.end(); it != end; ++it, ++counter)
713  {
714  // Simple Progress monitor
715  std::cout << " Combining primary datasets... "<<int(100*counter/param_names.size())<<"% (copied "<<counter<<" parameters of "<<param_names.size()<<") \r"<<std::flush;
716 
717  //std::vector<hid_t> file_ids(files.size(),-1);
718  //std::vector<hid_t> group_ids(files.size(),-1);
719  //std::vector<hid_t> datasets(files.size(),-1);
720  //std::vector<hid_t> datasets2(files.size(),-1);
721 
722  size_t offset = 0; // where to begin writing the next batch in output datasets
723  for(size_t batch_i = 0, end = N_BATCHES; batch_i < end; batch_i++)
724  {
725  size_t pc_offset = 0; // possible extra offset due to previous combined datasets. Only batch 0 will use this.
726  long long valid_dset = -1; // index of a validly opened dataset (-1 if none)
727  size_t THIS_BATCH_SIZE = BATCH_SIZE;
728  std::vector<unsigned long long> batch_sizes(BATCH_SIZE,0);
729  // IDs for this batch
730  std::vector<hid_t> file_ids (THIS_BATCH_SIZE,-1);
731  std::vector<hid_t> group_ids(THIS_BATCH_SIZE,-1);
732  std::vector<hid_t> datasets (THIS_BATCH_SIZE,-1);
733  std::vector<hid_t> datasets2(THIS_BATCH_SIZE,-1);
734 
735  // Collect all the HDF5 ids need to combine this parameter for this batch of files
736  if(remainder>0 and batch_i==N_BATCHES-1) THIS_BATCH_SIZE = remainder; // Last batch is incomplete
737  //std::cout << " Processing temp files "<<batch_i*BATCH_SIZE<<" to "<<batch_i*BATCH_SIZE+THIS_BATCH_SIZE<<std::endl;
738  //std::cout << "BATCH "<< batch_i << std::endl;
739  for (size_t file_i = 0, end = THIS_BATCH_SIZE; file_i < end; file_i++)
740  {
741  size_t i = file_i + batch_i * BATCH_SIZE;
742 
743  //std::cout << i << " ("<<(files[i]>=0)<<"), ";
744 
745  hid_t file_id = -1;
746  hid_t group_id = -1;
747  hid_t dataset = -1;
748  hid_t dataset2 = -1;
749  batch_sizes[file_i] = sizes[i]; // Collect pre-measured dataset sizes for this batch
750 
751  // Skip this file if it wasn't successfully opened earlier
752  if(files[i]>=0)
753  {
754  // Reopen files
755  // NOTE: RAM problems largely solved, but footprint could be lowered even more if we only open one file at a time.
756  std::string fname = get_fname(i);
757  if(not HDF5::checkFileReadable(fname))
758  {
759  std::cout <<"files["<<i<<"] = "<<files[i]<<std::endl;
760  std::cerr<<"WARNING! "<<fname<<" was not readable! This should have been caught earlier!"<<std::endl;
761  }
762  file_id = HDF5::openFile(fname);
763  files[i] = file_id;
764  group_id = HDF5::openGroup(file_id, group_name, true); // final argument prevents group from being created
765  if(group_id>=0)
766  {
767  HDF5::errorsOff();
768  dataset = HDF5::openDataset(group_id, *it, true); // Allow fail; not all parameters must exist in all temp files
769  dataset2 = HDF5::openDataset(group_id, *it + "_isvalid", true);
770  HDF5::errorsOn();
771  }
772 
773  if(dataset>=0)
774  {
775  if(dataset2>=0) valid_dset = file_i; // Use this one to determine dataset types for this batch (TODO: will be bad if it doesn't match other batches! could make this more robust)
776  else
777  {
778  std::ostringstream errmsg;
779  errmsg << "Error opening dataset '"<<*it<<"_isvalid' from temp file "<<i<<"! Main dataset was opened, but 'isvalid' dataset failed to open! It may be corrupted.";
780  printer_error().raise(LOCAL_INFO, errmsg.str());
781  }
782  }
783  }
784 
785  datasets [file_i] = dataset;
786  datasets2[file_i] = dataset2;
787  file_ids [file_i] = file_id;
788  group_ids[file_i] = group_id;
789  }
790 
791  // Get id for old combined output for this parameter
792  hid_t old_dataset = -1, old_dataset2 = -1;
793  if (resume and batch_i==0) // Only copy old dataset once, during first batch of files
794  {
795  //std::cout << "Opening previous combined dataset for parameter "<<*it<<std::endl;
796  HDF5::errorsOff();
797  old_dataset = HDF5::openDataset(old_group, *it, true); // Allow fail; may be no previous combined output
798  old_dataset2 = HDF5::openDataset(old_group, *it + "_isvalid", true);
799  HDF5::errorsOn();
800  if(old_dataset<0) std::cout << "Failed to open previous combined dataset for parameter "<<*it<<std::endl; // debugging
801  if(old_dataset>0)
802  {
803  hid_t space = H5Dget_space(old_dataset);
804  hsize_t dim_t = H5Sget_simple_extent_npoints(space);
805  H5Sclose(space);
806  pc_offset = dim_t; // Record size of old combined dataset, needed to get correct offset for next batch
807  }
808  }
809 
810  // With new batching system, could get a batch with no data for this parameter. This is ok,
811  // so we just skip it.
812  // TODO: Could put in a check to make sure that SOME batch copies data.
813  // This should happen though, since the parameter name must have been retrieved from *some*
814  // file or other, so it would just be double-checking that this worked correctly.
815  if(valid_dset>=0 or old_dataset>=0)
816  {
817  hid_t type, type2;
818  if(valid_dset<0)
819  {
820  // Parameter not in any of the new temp files, must only be in the old combined output, get type from there.
821  type = H5Dget_type(old_dataset);
822  type2 = H5Dget_type(old_dataset2);
823  }
824  else
825  {
826  // Get the type from a validly opened dataset in temp file
827  type = H5Dget_type(datasets[valid_dset]);
828  type2 = H5Dget_type(datasets2[valid_dset]);
829  }
830  if(type<0 or type2<0)
831  {
832  std::ostringstream errmsg;
833  errmsg << "Failed to detect type for dataset '"<<*it<<"'! The dataset is supposedly valid, so this does not make sense. It must be a bug, please report it. (valid_dset="<<valid_dset<<")";
834  printer_error().raise(LOCAL_INFO, errmsg.str());
835  }
836 
837  if(batch_i==0) // Only want to create the output dataset once!
838  {
839  // Create datasets
840  //std::cout<<"Creating dataset '"<<*it<<"' in file:group "<<file<<":"<<group_name<<std::endl;
841  setup_hdf5_points(new_group, type, type2, size_tot, *it);
842  }
843 
844  // Reopen dataset for writing
845  hid_t dataset_out = HDF5::openDataset(new_group, *it);
846  hid_t dataset2_out = HDF5::openDataset(new_group, (*it)+"_isvalid");
847  //std::cout << "Copying parameter "<<*it<<std::endl; // debug
848 
849  // Measure total size of datasets for this batch of files
850  // (not counting previous combined output, which will be copied in batch 0)
851  unsigned long long batch_size_tot = 0;
852  for(auto it = batch_sizes.begin(); it != batch_sizes.end(); ++it)
853  {
854  batch_size_tot += *it;
855  }
856 
857  //Enter_HDF5<copy_hdf5>(dataset_out, datasets, size_tot_l, sizes, old_dataset);
858  //Enter_HDF5<copy_hdf5>(dataset2_out, datasets2, size_tot_l, sizes, old_dataset2);
859 
860  // Do the copy!!!
861  Enter_HDF5<copy_hdf5>(dataset_out, datasets, batch_size_tot, batch_sizes, old_dataset, offset);
862  Enter_HDF5<copy_hdf5>(dataset2_out, datasets2, batch_size_tot, batch_sizes, old_dataset2, offset);
863 
864  // Close resources
865  HDF5::closeDataset(dataset_out);
866  HDF5::closeDataset(dataset2_out);
867 
868  // Move offset so that next batch is written to correct place in output file
869  offset += batch_size_tot + pc_offset;
870  }
871  else
872  {
873  std::cout << "No datasets found for parameter "<<*it<<" in file batch "<<batch_i<<". Moving on to next batch since there seems to be nothing to copy in this one." << std::endl;
874  }
875 
876  // Close files etc. associated with this batch
877  for (size_t file_i = 0, end = file_ids.size(); file_i < end; file_i++)
878  {
879  if(datasets[file_i]>=0) HDF5::closeDataset(datasets[file_i]);
880  if(datasets2[file_i]>=0) HDF5::closeDataset(datasets2[file_i]);
881  if(group_ids[file_i]>=0) HDF5::closeGroup(group_ids[file_i]);
882  if(file_ids[file_i]>=0) HDF5::closeFile(file_ids[file_i]);
883  }
884 
885  } // end batch, begin processing next batch of files.
886  }
887  std::cout << " Combining primary datasets... Done. "<<std::endl;
888 
889  // Debug: early exit to check what primary combined output looks like
890  // Flush and close output file
891  // H5Fflush(new_file, H5F_SCOPE_GLOBAL);
892  // HDF5::closeGroup(new_group);
893  // HDF5::closeFile(new_file);
894  // exit(0);
895 
896  // TODO! I have not yet implemented batch copying for the RA datasets!
897 
898  // Ben: NEW. Before copying RA points, we need to figure out a map between them
899  // and their targets in the output dataset. That means we need to read through
900  // the output dataset and read in all the pointID/MPI pairs.
901  // We only need to do this once and create a big hash table to use while copying.
902  // It would be better to do all the copying in chunks, and then we would also
903  // only need chunks of this hash table at a time, but Greg didn't write this
904  // code with chunked writing in mind. TODO: Should modify the code to do this.
905 
906  // We already know all the RA rank/ptID pairs, so just need to scan the output
907  // datasets for the matching pairs, and record their indices.
908 
909  // Start with a list of ID pairs to be matched
910  if(not custom_mode)
911  { // ranks and pointIDs not guaranteed to be unique in custom mode! RA datasets to be ignored, should be done prior to custom combining.
912  std::unordered_set<PPIDpair,PPIDHash,PPIDEqual> left_to_match;
913  for(std::size_t i=0; i<ranks.size(); ++i)
914  {
915  for(std::size_t j=0; j<ranks[i].size(); ++j)
916  {
917  //if(ranks[i][j]==0 and ptids[i][j]==0)
918  //{
919  // std::cout<<"Added r="<<ranks[i][j]<<", p="<<ptids[i][j]<<" to hash search requests"<<std::endl;
920  //}
921  left_to_match.insert(PPIDpair(ptids[i][j],ranks[i][j]));
922  }
923  }
924 
925  if(left_to_match.size()>0)
926  {
927  std::unordered_map<PPIDpair, unsigned long long, PPIDHash,PPIDEqual> RA_write_hash(get_RA_write_hash(new_group, left_to_match));
928 
930  counter = 1; //reset counter
931  for (auto it = aux_param_names.begin(), end = aux_param_names.end(); it != end; ++it, ++counter)
932  {
933  std::cout << " Combining auxilliary datasets... "<<int(100*counter/aux_param_names.size())<<"% (merged "<<counter<<" parameters of "<<aux_param_names.size()<<") \r"<<std::flush;
934  std::vector<hid_t> file_ids, group_ids, datasets, datasets2;
935  int valid_dset = -1; // index of a validly opened dataset (-1 if none)
936 
937  #ifdef COMBINE_DEBUG
938  std::cerr << " Preparing to copy dataset '"<<*it<<"'" << std::endl;
939  #endif
940 
941  for (int i = 0, end = aux_groups.size(); i < end; i++)
942  {
943  hid_t file_id = -1;
944  hid_t group_id = -1;
945  hid_t dataset = -1;
946  hid_t dataset2 = -1;
947 
948  // Skip this file if it wasn't successfully opened earlier
949  if(files[i]>-1)
950  {
951  // Reopen files
952  std::string fname = get_fname(i);
953  file_id = HDF5::openFile(fname);
954  files[i] = file_id;
955  group_id = HDF5::openGroup(file_id, group_name+"/RA", true); // final argument prevents group from being created
956 
957  // Dataset may not exist, and thus fail to open. We will check its status
958  // later on and ignore it where needed.
959  if(group_id>=0)
960  {
961  HDF5::errorsOff();
962  dataset = HDF5::openDataset(group_id, *it, true); // Allow fail; not all parameters must exist in all temp files
963  dataset2 = HDF5::openDataset(group_id, *it + "_isvalid", true);
964  HDF5::errorsOn();
965  }
966 
967  if(dataset>=0)
968  {
969  if(dataset2>=0) valid_dset = i;
970  else
971  {
972  std::ostringstream errmsg;
973  errmsg << "Error opening dataset '"<<*it<<"_isvalid' from temp file "<<i<<"! Main dataset was opened, but 'isvalid' dataset failed to open! It may be corrupted.";
974  printer_error().raise(LOCAL_INFO, errmsg.str());
975  }
976  }
977  }
978  datasets.push_back(dataset);
979  datasets2.push_back(dataset2);
980  file_ids.push_back(file_id);
981  group_ids.push_back(group_id);
982  }
983 
984  hid_t old_dataset = -1, old_dataset2 = -1;
985  // Actually, I think there is now no need to copy the old data here. It should
986  // already be copied as part of the primary dataset copying.
987  // if (resume)
988  // {
989  // // Dataset may not exist, and thus fail to open. We will check its status
990  // // later on and ignore it where needed.
991  // HDF5::errorsOff();
992  // old_dataset = HDF5::openDataset(old_group, *it, true);
993  // old_dataset2 = HDF5::openDataset(old_group, *it + "_isvalid", true);
994  // HDF5::errorsOn();
995  // }
996 
997  // If the aux parameter was not also copied as a primary parameter then we need to create a new
998  // dataset for it here. Otherwise one should already exist.
999  if(param_set.find(*it) == param_set.end())
1000  {
1001  #ifdef COMBINE_DEBUG
1002  std::cerr << " No output dataset for '"<<*it<<"' found amongst those created during copying of primary parameters, preparing to create it." << std::endl;
1003  #endif
1004 
1005  hid_t type=-1, type2=-1;
1006  if(valid_dset<0)
1007  {
1008  // No valid dset open, but we are supposed to copy something? Error.
1009  std::ostringstream errmsg;
1010  errmsg << "Error copying RA points for dataset '"<<*it<<"'. No valid datasets could be opened, though they were detected in the temp files. They may be corrupted.";
1011  printer_error().raise(LOCAL_INFO, errmsg.str());
1012  }
1013  else
1014  {
1015  // Get the type from a validly opened dataset in temp file
1016  type = H5Dget_type(datasets[valid_dset]);
1017  type2 = H5Dget_type(datasets2[valid_dset]);
1018  if(type<0 or type2<0)
1019  {
1020  std::ostringstream errmsg;
1021  errmsg << "Failed to detect type for RA dataset '"<<*it<<"'! The dataset is supposedly valid, so this does not make sense. It must be a bug, please report it.";
1022  printer_error().raise(LOCAL_INFO, errmsg.str());
1023  }
1024  }
1025  // Create new dataset
1026  setup_hdf5_points(new_group, type, type2, size_tot, *it);
1027  }
1028  // Reopen output datasets for copying
1029  hid_t dataset_out = HDF5::openDataset(new_group, *it);
1030  hid_t dataset2_out = HDF5::openDataset(new_group, (*it)+"_isvalid");
1031  Enter_HDF5<ra_copy_hdf5>(dataset_out, dataset2_out, datasets, datasets2, size_tot, RA_write_hash, ptids, ranks, aux_sizes, old_dataset, old_dataset2);
1032 
1033  // Close resources
1034  for (int i = 0, end = datasets.size(); i < end; i++)
1035  {
1036  // Some datasets may never have been opened, so check this before trying to close them.
1037  if(datasets[i]>=0) HDF5::closeDataset(datasets[i]);
1038  if(datasets2[i]>=0) HDF5::closeDataset(datasets2[i]);
1039  if(group_ids[i]>=0) HDF5::closeGroup(group_ids[i]);
1040  if(file_ids[i]>=0) HDF5::closeFile(file_ids[i]);
1041  }
1042  }
1043  std::cout << " Combining auxilliary datasets... Done. "<<std::endl;
1044  }
1045  else
1046  {
1047  std::cout << " Combining auxilliary datasets... None found, skipping. "<<std::endl;
1048  }
1049  }
1050 
1051  // Close the old combined file
1052  if(old_group>=0) HDF5::closeGroup(old_group);
1053  if(old_file>=0) HDF5::closeFile(old_file);
1054 
1055  // Flush and close output file
1056  H5Fflush(new_file, H5F_SCOPE_GLOBAL);
1057  HDF5::closeGroup(new_group);
1058  HDF5::closeFile(new_file);
1059 
1060  if (do_cleanup and not custom_mode) // Cleanup disabled for custom mode. This is only for "routine" combination during scan resuming.
1061  {
1062  if (resume)
1063  {
1064  std::system(("rm -f " + file + ".temp.bak").c_str());
1065  }
1066 
1067  for (int i = 0, end = files.size(); i < end; i++)
1068  {
1069  std::stringstream ss;
1070  ss << i;
1071  std::system(("rm -f " + root_file_name + "_temp_" + ss.str()).c_str());
1072  }
1073  }
1074  }
1075 
1078  std::unordered_map<PPIDpair, unsigned long long, PPIDHash,PPIDEqual> get_RA_write_hash(hid_t group_id, std::unordered_set<PPIDpair,PPIDHash,PPIDEqual>& left_to_match)
1079  {
1080  std::unordered_map<PPIDpair, unsigned long long, PPIDHash,PPIDEqual> output_hash;
1081 
1082  // Chunking variables
1083  static const std::size_t CHUNKLENGTH = 10000; // Should be a reasonable value
1084 
1085  // Interfaces for the datasets
1086  // Make sure the types used here don't get out of sync with the types used to write the original datasets
1087  // We open the datasets in "resume" mode to access existing dataset, and make "const" to disable writing of new data. i.e. "Read-only" mode.
1088  // TODO: this can probably be streamlined once I write the HDF5 reader, can consolidate some reading routines.
1089  const DataSetInterfaceScalar<unsigned long, CHUNKLENGTH> pointIDs(group_id, "pointID", true,'r');
1090  const DataSetInterfaceScalar<int, CHUNKLENGTH> pointIDs_isvalid (group_id, "pointID_isvalid", true,'r');
1091  const DataSetInterfaceScalar<int, CHUNKLENGTH> mpiranks (group_id, "MPIrank", true,'r');
1092  const DataSetInterfaceScalar<int, CHUNKLENGTH> mpiranks_isvalid (group_id, "MPIrank_isvalid", true,'r');
1093 
1094  // Error check lengths. This should already have been done for all datasets in the group, but
1095  // we will double-check these four here.
1096  const std::size_t dset_length = pointIDs.dset_length();
1097  const std::size_t dset_length2 = pointIDs_isvalid.dset_length();
1098  const std::size_t dset_length3 = mpiranks.dset_length();
1099  const std::size_t dset_length4 = mpiranks_isvalid.dset_length();
1100  if( (dset_length != dset_length2)
1101  or (dset_length3 != dset_length4)
1102  or (dset_length != dset_length3) )
1103  {
1104  std::ostringstream errmsg;
1105  errmsg << "Error scanning combined output for RA point locations! Unequal dataset lengths detected in pointID and MPIrank datasets:" <<std::endl;
1106  errmsg << " pointIDs.dset_length() = " << dset_length << std::endl;
1107  errmsg << " pointIDs_isvalid.dset_length() = " << dset_length2 << std::endl;
1108  errmsg << " mpiranks.dset_length() = " << dset_length3 << std::endl;
1109  errmsg << " mpiranks_isvalid.dset_length() = " << dset_length4 << std::endl;
1110  errmsg << "This indicates either a bug in the HDF5 combine code, please report it.";
1111  printer_error().raise(LOCAL_INFO, errmsg.str());
1112  }
1113 
1114  //std::cout<<"New combined output dataset length: "<<dset_length<<std::endl;
1115 
1116  // Compute number of chunks
1117  const std::size_t NCHUNKS = dset_length / CHUNKLENGTH; // Number of FULL chunks
1118  const std::size_t REMAINDER = dset_length - (NCHUNKS*CHUNKLENGTH); // leftover after last full chunk
1119 
1120  std::size_t NCHUNKIT; // Number of chunk iterations to perform
1121  if(REMAINDER==0) { NCHUNKIT = NCHUNKS; }
1122  else { NCHUNKIT = NCHUNKS+1; } // Need an extra iteration to deal with incomplete chunk
1123 
1124  //std::cout<<"Will parse this output in "<<NCHUNKIT<<" chunks of size "<<CHUNKLENGTH<<std::endl;
1125 
1126  // Iterate through dataset in chunks
1127  for(std::size_t i=0; i<NCHUNKIT; ++i)
1128  {
1129  std::size_t offset = i*CHUNKLENGTH;
1130  std::size_t length;
1131  if(i==NCHUNKS){ length = REMAINDER; }
1132  else { length = CHUNKLENGTH; }
1133 
1134  //std::cout<<" Offset for chunk "<<i<<" is "<<offset<<". Length is "<<length<<std::endl;
1135 
1136  const std::vector<unsigned long> pID_chunk = pointIDs.get_chunk(offset,length);
1137  const std::vector<int> pIDvalid_chunk = pointIDs_isvalid.get_chunk(offset,length);
1138  const std::vector<int> rank_chunk = mpiranks.get_chunk(offset,length);
1139  const std::vector<int> rankvalid_chunk = mpiranks_isvalid.get_chunk(offset,length);
1140 
1141  // Check that retrieved lengths make sense
1142  if (pID_chunk.size() != CHUNKLENGTH)
1143  {
1144  if(not (i==NCHUNKS and pID_chunk.size()==REMAINDER) )
1145  {
1146  std::ostringstream errmsg;
1147  errmsg << "Error scanning combined pointID and MPIrank datasets! Size of chunk vector retrieved from pointID dataset ("<<pID_chunk.size()<<") does not match CHUNKLENGTH ("<<CHUNKLENGTH<<"), nor the expected remainder for the last chunk ("<<REMAINDER<<"). This probably indicates a bug in the DataSetInterfaceScalar.get_chunk routine, please report it. Error occurred while reading chunk i="<<i<<std::endl;
1148  printer_error().raise(LOCAL_INFO, errmsg.str());
1149  }
1150  }
1151  if( (pID_chunk.size() != pIDvalid_chunk.size())
1152  or (rank_chunk.size() != rankvalid_chunk.size())
1153  or (pID_chunk.size() != rank_chunk.size()) )
1154  {
1155  std::ostringstream errmsg;
1156  errmsg << "Error preparing to combine RA points into output dataset! Unequal chunk lengths retrieved while iterating through in pointID and MPIrank datasets:" <<std::endl;
1157  errmsg << " pID_chunk.size() = " << pID_chunk.size() << std::endl;
1158  errmsg << " pIDvalid_chunk.size() = " << pIDvalid_chunk.size() << std::endl;
1159  errmsg << " rank_chunk.size() = " << rank_chunk.size() << std::endl;
1160  errmsg << " rankvalid_chunk.size()= " << rankvalid_chunk.size() << std::endl;
1161  errmsg << " CHUNKLENGTH = " << CHUNKLENGTH << std::endl;
1162  errmsg << "This indicates a bug in the HDF5 combine code, please report it. Error occurred while reading chunk i="<<i<<std::endl;
1163  printer_error().raise(LOCAL_INFO, errmsg.str());
1164  }
1165 
1166  // Iterate within the chunk
1167  for(std::size_t j=0; j<length; ++j)
1168  {
1169  //Check validity flags agree
1170  if(pIDvalid_chunk[j] != rankvalid_chunk[j])
1171  {
1172  std::ostringstream errmsg;
1173  errmsg << "Error! Incompatible validity flags detected in pointID_isvalid and MPIrank_isvalid datasets at position j="<<j<<" in chunk i="<<i<<"(with CHUNKLENGTH="<<CHUNKLENGTH<<"). Specifically:"<<std::endl;
1174  errmsg << " pIDvalid_chunk[j] = " << pIDvalid_chunk[j] << std::endl;
1175  errmsg << " rankvalid_chunk[j] = " << rankvalid_chunk[j] << std::endl;
1176  errmsg << "This most likely indicates a bug in the HDF5 combine code, please report it.";
1177  printer_error().raise(LOCAL_INFO, errmsg.str());
1178  }
1179  // @{ Debugging: see what points exist in the output dataset (for rank 127)
1180  //if(rank_chunk[j]==127)
1181  //{
1182  // std::cout << " Scanned point "<<pID_chunk[j]<<" (rank "<<rank_chunk[j]<<", valid="<<pIDvalid_chunk[j]<<") in combined output file"<<std::endl;
1183  //}
1184  // @}
1185 
1186  // Check for hash match if entry is marked as "valid"
1187  if(rankvalid_chunk[j])
1188  {
1189  // Check if this point is in our list of points to be matched
1190  PPIDpair this_point(pID_chunk[j],rank_chunk[j]);
1191  std::unordered_set<PPIDpair,PPIDHash,PPIDEqual>::iterator match = left_to_match.find(this_point);
1192  if(match != left_to_match.end())
1193  {
1194  // Found a match! Add its index to the hash.
1195  output_hash[*match] = offset + j;
1196  // Delete it from the list of points that need to be matched (note, multiple entries in output file not allowed)
1197  left_to_match.erase(match);
1198  }
1199  }
1200  // else continue iteration
1201  }
1202  }
1203 
1204  // Check that all the matches were found!
1205  if( left_to_match.size() > 0 )
1206  {
1207  std::ostringstream errmsg;
1208  errmsg << "Error generating hash map for Auxilliary parameter copy! Failed to find matches in primary datasets for all auxilliary data! There were "<<left_to_match.size()<<" unmatched auxilliary points. Unmatched points follow:" << std::endl;
1209  for(auto it=left_to_match.begin(); it!=left_to_match.end(); ++it)
1210  {
1211  errmsg << " rank: "<<it->rank<<", pointID: "<<it->pointID<< std::endl;
1212  }
1213  printer_error().raise(LOCAL_INFO, errmsg.str());
1214  }
1215  return output_hash;
1216  }
1217 
1218  std::pair<std::vector<std::string>,std::vector<size_t>> find_temporary_files(const std::string& finalfile)
1219  {
1220  size_t max_i = 0;
1221  return find_temporary_files(finalfile, max_i);
1222  }
1223 
1225  std::pair<std::vector<std::string>,std::vector<size_t>> find_temporary_files(const std::string& finalfile, size_t& max_i)
1226  {
1227  // Autodetect temporary files from previous run.
1228  std::string output_dir = Utils::dir_name(finalfile);
1229  std::vector<std::string> files = Utils::ls_dir(output_dir);
1230  std::string tmp_base(Utils::base_name(finalfile) + "_temp_");
1231  std::vector<size_t> ranks;
1232  std::vector<std::string> result;
1233 
1234  //std::cout << "Matching against: " <<tmp_base<<std::endl;
1235  for(auto it=files.begin(); it!=files.end(); ++it)
1236  {
1237  //std::cout << (*it) << std::endl;
1238  //std::cout << it->substr(0,tmp_base.length()) << std::endl;
1239  if (it->compare(0, tmp_base.length(), tmp_base) == 0)
1240  {
1241  // Matches format of temporary file! Extract the rank that produced it
1242  std::stringstream ss;
1243  ss << it->substr(tmp_base.length());
1244  if(Utils::isInteger(ss.str())) // Only do this for files where the remainder of the string is just an integer (i.e. not the combined files etc.)
1245  {
1246  size_t rank;
1247  ss >> rank;
1248  if(rank>max_i) max_i = rank; // Look for highest rank output
1249  //std::cout << "Match! "<< ss.str() << " : " << rank << std::endl;
1250  // TODO: check for failed read
1251  ranks.push_back(rank);
1252  result.push_back(output_dir+"/"+*it);
1253  }
1254  }
1255  }
1256 
1257  // Check if all temporary files found (i.e. if output from some rank is missing)
1258  std::vector<size_t> missing;
1259  for(size_t i=0; i<max_i; ++i)
1260  {
1261  if(std::find(ranks.begin(), ranks.end(), i) == ranks.end())
1262  { missing.push_back(i); }
1263  }
1264 
1265  // Return the list of missing tmp files, the caller can decide to throw an error if they like
1266  return std::make_pair(result,missing);
1267  }
1268 
1269  }
1270  }
1271 }
1272 
1273 
EXPORT_SYMBOLS str dir_name(const str &path)
Get directory name from full path+filename (POSIX)
hid_t openDataset(hid_t dset_id, const std::string &name, bool error_off=false)
Dataset and dataspace manipulation.
Definition: hdf5tools.cpp:691
dataset H5Sget_simple_extent_npoints
Definition: hdf5tools.cpp:720
EXPORT_SYMBOLS error & printer_error()
Printer errors.
void setup_hdf5_points(hid_t new_group, hid_t type, hid_t type2, unsigned long long size_tot, const std::string &name)
herr_t op_func(hid_t loc_id, const char *name_in, const H5L_info_t *, void *operator_data)
#define LOCAL_INFO
Definition: local_info.hpp:34
hsize_t getGroupNum(hid_t group_id)
dataset getSimpleExtentNpoints
Definition: hdf5tools.cpp:720
hid_t openFile(const std::string &fname, bool overwrite, bool &oldfile, const char access_type='r')
File and group manipulation.
Definition: hdf5tools.cpp:182
std::vector< unsigned long long > sizes
std::vector< std::string > param_names
EXPORT_SYMBOLS bool isInteger(const std::string &)
Check if a string represents an integer From: http://stackoverflow.com/a/2845275/1447953.
void Enter_Aux_Parameters(const std::string &output_file, bool resume=false)
std::vector< unsigned long long > cum_sizes
std::unordered_map< PPIDpair, unsigned long long, PPIDHash, PPIDEqual > get_RA_write_hash(hid_t, std::unordered_set< PPIDpair, PPIDHash, PPIDEqual > &)
Helper function to create output hash map for RA points note: left_to_match points will be erased as ...
void errorsOff()
Silence error report (e.g. while probing for file existence)
Definition: hdf5tools.cpp:671
EXPORT_SYMBOLS std::vector< str > ls_dir(const str &dir)
Return a vector of strings listing the contents of a directory (POSIX)
Greg&#39;s code for combining HDF5 output of multiple MPI processes.
General small utility functions.
hdf5_stuff(const std::string &base_file_name, const std::string &output_file, const std::string &group_name, const size_t num, const bool cleanup, const bool skip, const std::vector< std::string > &input_files)
hid_t getType(hid_t dataset)
EXPORT_SYMBOLS bool file_exists(const std::string &filename)
Check if a file exists.
std::string str
Definition: sqlitebase.hpp:52
EXPORT_SYMBOLS str base_name(const str &path)
Get file name from full path+filename (POSIX)
std::vector< std::string > getGroups(std::string groups)
hid_t closeFile(hid_t file)
Close hdf5 file.
Definition: hdf5tools.cpp:92
bool checkFileReadable(const std::string &fname, std::string &msg)
Check if hdf5 file exists and can be opened in read/write mode.
Definition: hdf5tools.cpp:268
DS5_MSPCTM DS_INTDOF int
std::unordered_set< std::string > aux_param_set
std::unordered_set< std::string > param_set
Declaration for class DataSetInterfaceScalar.
hid_t openGroup(hid_t file_id, const std::string &name, bool nocreate=false)
Definition: hdf5tools.cpp:512
hid_t closeSpace(hid_t space_id)
Close dataspace.
std::vector< std::string > get_dset_names(hid_t group_id)
A collection of tools for interacting with HDF5 databases.
std::string get_fname(const size_t)
std::vector< std::string > aux_param_names
std::vector< std::string > file_names
pointID / process number pair Used to identify a single parameter space point
TODO: see if we can use this one:
Definition: Analysis.hpp:33
void errorsOn()
Restore error report.
Definition: hdf5tools.cpp:681
std::pair< std::vector< std::string >, std::vector< size_t > > find_temporary_files(const std::string &finalfile)
Search for temporary files to be combined.
hid_t closeDataset(hid_t dset_id)
Close dataset.
std::vector< T > get_chunk(std::size_t i, std::size_t length) const
READ methods (perhaps can generalise to non-scalar case, but this doesn&#39;t exist yet for writing anywa...
herr_t op_func_aux(hid_t loc_id, const char *name_in, const H5L_info_t *, void *operator_data)