gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-252-gf9a3f78
a Global And Modular Bsm Inference Tool
hdf5printer.cpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
84 
85 
86 // Standard libraries
87 #include <map>
88 #include <vector>
89 #include <algorithm>
90 #include <ios>
91 #include <sstream>
92 #include <iostream>
93 #include <fstream>
94 #include <iomanip>
95 #include <cstdlib> // For popen in finalise()
96 #include <chrono>
97 
98 // Gambit
103 
104 #include "gambit/cmake/cmake_variables.hpp"
109 #include "gambit/Logs/logger.hpp"
110 
111 // MPI bindings
114 
115 // Switch for debugging output (manual at the moment)
116 
117 #ifdef DEBUG_MODE
118 #define DBUG(x) x
119 #else
120 #define DBUG(x)
121 #endif
122 
123 //#define CHECK_SYNC
124 
125 // Debugging for 'finalise' routine
126 #define FINAL_CHECK_SYNC
127 #define FINAL_DEBUG_MODE
128 
129 // Code!
130 namespace Gambit
131 {
132  namespace Printers
133  {
134 
135  // Locally defined helper struct
136  struct DSetData
137  {
138  // Input data for HDF5 helper functions
139  int rank;
140 
141  // Dataset metadata
142  std::vector<std::string> names;
143  std::vector<unsigned long> lengths;
144 
146  // // Contents of pointID and mpirank datasets
147  // std::vector<unsigned long> pointIDs;
148  // std::vector<int> pointIDs_isvalid;
149  // std::vector<unsigned int> mpiranks;
150  // std::vector<int> mpiranks_isvalid;
151 
152  // Report error
153  std::string local_info;
154  std::string errmsg;
155 
156  DSetData(int r) : rank(r) {}
157  };
158 
159  // Helper function to check for GAMBIT shutdown messages due to errors in other processes
161  {
162  signaldata().check_if_shutdown_begun(); // Will throw a shutdown exception if an emergency shutdown command is received via MPI
163  }
164 
165  // Helper function for examining existing HDF5 file during verification stage
166  // Finds the highest PPID for our rank
167  // (separate function checks datasets for consistent lengths; that should run first)
168 
169  // Note; resumed runs may use different numbers of processes to the initial run.
170  // But this should be no problem; if there are new processes added, the highest
171  // previous PPID number for those ranks is just zero. If there are fewer, then
172  // there will be still be matching old-process ranks for all of the new ranks.
173  //PPIDpair
174  std::map<unsigned long, unsigned long long int> HDF5Printer::get_highest_PPID_from_HDF5(hid_t group_id)
175  {
176  //unsigned long long int highest_pointID = 0; // Highest ID found so far
177  std::map<unsigned long, unsigned long long int> highest_pointIDs;
178 
179  // Chunking variables
180  static const std::size_t CHUNKLENGTH = 1000; // Should be a reasonable value
181 
182  // Interfaces for the datasets
183  // Make sure the types used here don't get out of sync with the types used to write the original datasets
184  // We open the datasets in "resume" mode to access existing dataset, and make "const" to disable writing of new data. i.e. "Read-only" mode.
185  const DataSetInterfaceScalar<unsigned long long, CHUNKLENGTH> pointIDs(group_id, "pointID", true, 'r');
186  const DataSetInterfaceScalar<int, CHUNKLENGTH> pointIDs_isvalid (group_id, "pointID_isvalid", true, 'r');
187  const DataSetInterfaceScalar<int, CHUNKLENGTH> mpiranks (group_id, "MPIrank", true, 'r');
188  const DataSetInterfaceScalar<int, CHUNKLENGTH> mpiranks_isvalid (group_id, "MPIrank_isvalid", true, 'r');
189 
190  // Error check lengths. This should already have been done for all datasets in the group, but
191  // we will double-check these four here.
192  const std::size_t dset_length = pointIDs.dset_length();
193  const std::size_t dset_length2 = pointIDs_isvalid.dset_length();
194  const std::size_t dset_length3 = mpiranks.dset_length();
195  const std::size_t dset_length4 = mpiranks_isvalid.dset_length();
196  if( (dset_length != dset_length2)
197  or (dset_length3 != dset_length4)
198  or (dset_length != dset_length3) )
199  {
200  std::ostringstream errmsg;
201  errmsg << "Error retrieving highest PPID from previous dataset! Unequal dataset lengths detected in pointID and MPIrank datasets:" <<std::endl;
202  errmsg << " pointIDs.dset_length() = " << dset_length << std::endl;
203  errmsg << " pointIDs_isvalid.dset_length() = " << dset_length2 << std::endl;
204  errmsg << " mpiranks.dset_length() = " << dset_length3 << std::endl;
205  errmsg << " mpiranks_isvalid.dset_length() = " << dset_length4 << std::endl;
206  errmsg << "This indicates either a bug in the HDF5printer or corruption of the datasets (possibly due to unsafe shutdown).";
207  printer_error().raise(LOCAL_INFO, errmsg.str());
208  }
209 
210  // Compute number of chunks
211  const std::size_t NCHUNKS = dset_length / CHUNKLENGTH; // Number of FULL chunks
212  const std::size_t REMAINDER = dset_length - (NCHUNKS*CHUNKLENGTH); // leftover after last full chunk
213 
214  std::size_t NCHUNKIT; // Number of chunk iterations to perform
215  if(REMAINDER==0) { NCHUNKIT = NCHUNKS; }
216  else { NCHUNKIT = NCHUNKS+1; } // Need an extra iteration to deal with incomplete chunk
217 
218  logger()<<"Begining iteration through existing HDF5 output for rank "<<getRank()<<", searching for previous highest pointID."<<EOM;
219 
220  // Iterate through dataset in chunks
221  for(std::size_t i=0; i<NCHUNKIT; ++i)
222  {
223  std::size_t offset = i*CHUNKLENGTH;
224  std::size_t length;
225 
226  if(i==NCHUNKS){ length = REMAINDER; }
227  else { length = CHUNKLENGTH; }
228 
229  logger()<<"rank "<<getRank()<<": chunk "<<i<<": reading entries "<<offset<<" to "<<offset+length<<"."<<EOM;
230 
231  const std::vector<unsigned long long> pID_chunk = pointIDs.get_chunk(offset,length);
232  const std::vector<int> pIDvalid_chunk = pointIDs_isvalid.get_chunk(offset,length);
233  const std::vector<int> rank_chunk = mpiranks.get_chunk(offset,length);
234  const std::vector<int> rankvalid_chunk = mpiranks_isvalid.get_chunk(offset,length);
235 
236  // Check that retrieved lengths make sense
237  if (pID_chunk.size() != CHUNKLENGTH)
238  {
239  if(not (i==NCHUNKS and pID_chunk.size()==REMAINDER) )
240  {
241  std::ostringstream errmsg;
242  errmsg << "Error retrieving highest PPID from previous dataset! Size of chunk vector retrieved from pointID dataset ("<<pID_chunk.size()<<") does not match CHUNKLENGTH ("<<CHUNKLENGTH<<"), nor the expected remainder for the last chunk ("<<REMAINDER<<"). This probably indicates a bug in the DataSetInterfaceScalar.get_chunk routine, please report it. Error occurred while reading chunk i="<<i<<std::endl;
243  printer_error().raise(LOCAL_INFO, errmsg.str());
244  }
245  }
246  if( (pID_chunk.size() != pIDvalid_chunk.size())
247  or (rank_chunk.size() != rankvalid_chunk.size())
248  or (pID_chunk.size() != rank_chunk.size()) )
249  {
250  std::ostringstream errmsg;
251  errmsg << "Error retrieving highest PPID from previous dataset! Unequal chunk lengths retrieved while iterating through in pointID and MPIrank datasets:" <<std::endl;
252  errmsg << " pID_chunk.size() = " << pID_chunk.size() << std::endl;
253  errmsg << " pIDvalid_chunk.size() = " << pIDvalid_chunk.size() << std::endl;
254  errmsg << " rank_chunk.size() = " << rank_chunk.size() << std::endl;
255  errmsg << " rankvalid_chunk.size()= " << rankvalid_chunk.size() << std::endl;
256  errmsg << " CHUNKLENGTH = " << CHUNKLENGTH << std::endl;
257  errmsg << "This indicates either a bug in the HDF5printer or corruption of the datasets (possibly due to unsafe shutdown). Error occurred while reading chunk i="<<i<<std::endl;
258  printer_error().raise(LOCAL_INFO, errmsg.str());
259  }
260 
261  // Iterate within the chunk
262  for(std::size_t j=0; j<length; ++j)
263  {
264  //Check validity flags agree
265  if(pIDvalid_chunk[j] != rankvalid_chunk[j])
266  {
267  std::ostringstream errmsg;
268  errmsg << "Error retrieving highest PPID from previous dataset! Incompatible validity flags detected in pointID_isvalid and MPIrank_isvalid datasets at position j="<<j<<" in chunk i="<<i<<"(with CHUNKLENGTH="<<CHUNKLENGTH<<"). Specifically:"<<std::endl;
269  errmsg << " pIDvalid_chunk[j] = " << pIDvalid_chunk[j] << std::endl;
270  errmsg << " rankvalid_chunk[j] = " << rankvalid_chunk[j] << std::endl;
271  errmsg << "This most likely indicates a bug in the HDF5printer, but could indicate corruption of the datasets (possibly due to unsafe shutdown). Please report it.";
272  printer_error().raise(LOCAL_INFO, errmsg.str());
273  }
274 
275  //std::cerr<<"rank "<<getRank()<<": Entry (valid="<<pIDvalid_chunk[j]<<"): rank="<<rank_chunk[j]<<" , pointID="<<pID_chunk[j]<<std::endl;
276 
277  // Continue only if entry is marked as "valid" and corresponds to our rank
278  if(rankvalid_chunk[j])// and rank_chunk[j]==getRank())
279  {
280  // Test the pointID for this point to see if it is the highest so far.
281  if(pID_chunk[j] > highest_pointIDs[rank_chunk[j]])
282  {
283  highest_pointIDs[rank_chunk[j]] = pID_chunk[j];
284  //std::cerr<<"rank "<<getRank()<<": new highest pointID found = "<<highest_pointID<<std::endl;
285  }
286  }
287  // else continue iteration
288  }
289  }
290 
291  // Return the highest ID found (-1 if none)
292  return highest_pointIDs; //PPIDpair(highest_pointID,getRank());
293  }
294 
295  // We are going to have to combine this data with information from the
296  // scanners (using the auxilliary printers). In order to do this efficiently,
297  // we will store the pointIDs and ranks in a dataset seperate from the
298  // bulk of the data (but correlated with it) so that we can quickly search
299  // for records by their pointID and rank, and then write new data to them.
300  //
301  // NOTE: will have to change the auxilliary printers a bit, so that they
302  // communicate what they intend to write back to the main printer... or something.
303 
304 
306 
307  // Constructor
308  HDF5Printer::HDF5Printer(const Options& options, BasePrinter* const primary)
309  : BasePrinter(primary,options.getValueOrDef<bool>(false,"auxilliary"))
310  , lastPointID(nullpoint)
311  , printer_name("Primary printer")
312  , myRank(0)
313  , mpiSize(1)
314 #ifdef WITH_MPI
315  , myComm() // initially attaches to MPI_COMM_WORLD
316 #endif
317  {
318  common_constructor(options);
319  }
320 
321  // Get options required to construct a reader object that can read
322  // the previous output of this printer.
324  {
325  Options options;
326  // Set options that we need later to construct a reader object for
327  // previous output, if required.
328  options.setValue("type", "hdf5");
329  options.setValue("file", tmp_comb_file);
330  options.setValue("group", group);
331  return options;
332  }
333 
335  {
336 #ifdef WITH_MPI
337  // Note; here 'myRank' is the REAL mpi rank. Used
338  // for process-specific actions.
339  // the inherited 'getRank' function should be used for
340  // printing "as if" this process is from that rank,
341  // i.e. mostly just for setting point ID codes.
342  // 'myRank' will not change, but getRank() may be
343  // changed by the scanner (e.g. postprocessor).
344  myRank = myComm.Get_rank();
345  this->setRank(myRank);
346 #endif
347 
348  // Disable output combination routines?
349  disable_combine_routines = options.getValueOrDef<bool>(false,"disable_combine_routines");
350 
351  if(not this->is_auxilliary_printer())
352  {
353  // Set up this printer in primary mode
354  DBUG( std::cout << "Constructing Primary HDF5Printer object..." << std::endl; )
355  is_primary_printer = true;
356 
357  set_resume(options.getValue<bool>("resume"));
358 
359  // Set up communicator context for HDF5 printer system
360 #ifdef WITH_MPI
361  myComm.dup(MPI_COMM_WORLD,"HDF5printerComm"); // duplicates MPI_COMM_WORLD
362  mpiSize = myComm.Get_size();
363 #endif
364 
365  std::ostringstream ss;
366  ss << "Primary printer for rank " << myRank;
367  printer_name = ss.str();
368 
369  // Name of file where results should ultimately end up
370  std::ostringstream ff;
371  if(options.hasKey("output_path"))
372  {
373  ff << options.getValue<std::string>("output_path") << "/";
374  }
375  else
376  {
377  ff << options.getValue<std::string>("default_output_path") << "/";
378  }
379  if(options.hasKey("output_file"))
380  {
381  ff << options.getValue<std::string>("output_file");
382  }
383  else
384  {
385  printer_error().raise(LOCAL_INFO, "No 'output_file' entry specified in the options section of the Printer category of the input YAML file. Please add a name there for the output hdf5 file of the scan.");
386  }
387  finalfile = ff.str();
388 
389  // Name of file where combined results from previous (unfinished) runs end up
390  std::ostringstream rename;
391  rename << finalfile << "_temp_combined";
392  tmp_comb_file = rename.str();
393 
394  // HDF5 group (virtual "folder") inside output file in which to store datasets
395  group = options.getValueOrDef<std::string>("/","group");
396 
397  // Delete final target file (or group) if one with same name already exists? (and if we are restarting the run)
398  // This is just for convenience during testing; by default datasets will simply be replaced in/added to
399  // existing target HDF5 files. This lets one combine data from many scans into one file if desired.
400  bool overwrite_file = options.getValueOrDef<bool>(false,"delete_file_on_restart");
401 
402  std::vector<unsigned long long int> highests(mpiSize);
403 
404  if(myRank==0)
405  {
406  // Check whether a readable output file exists with the name that we want to use.
407  std::string msg_finalfile;
408  #ifdef DEBUG_MODE
409  std::cout << "File readable: " << finalfile << " : " << HDF5::checkFileReadable(finalfile, msg_finalfile) <<std::endl;
410  #endif
411  if(HDF5::checkFileReadable(finalfile, msg_finalfile))
412  {
413  if(overwrite_file and not get_resume())
414  {
415  // Note: "not resume" means "start or restart"
416  // Delete existing output file
417  std::ostringstream command;
418  command << "rm -f "<<finalfile;
419  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command.str() << EOM;
420  FILE* fp = popen(command.str().c_str(), "r");
421  if(fp==NULL)
422  {
423  // Error running popen
424  std::ostringstream errmsg;
425  errmsg << "rank "<<myRank<<": Error deleting existing output file (requested by 'delete_file_on_restart' printer option; target filename is "<<finalfile<<")! popen failed to run the command (command was '"<<command.str()<<"')";
426  printer_error().raise(LOCAL_INFO, errmsg.str());
427  }
428  else if(pclose(fp)!=0)
429  {
430  // Command returned exit code!=0, or pclose failed
431  std::ostringstream errmsg;
432  errmsg << "rank "<<myRank<<": Error deleting existing output file (requested by 'delete_file_on_restart' printer option; target filename is "<<finalfile<<")! Shell command failed to executed successfully, see stderr (command was '"<<command.str()<<"').";
433  printer_error().raise(LOCAL_INFO, errmsg.str());
434  }
435  }
436  else
437  {
438  // File exists, so check if 'group' is readable, and throw error if it exists
439  file_id = HDF5::openFile(finalfile);
440  std::string msg_group;
441  std::cout << "Group readable: " << finalfile << " , " << group << " : " << HDF5::checkGroupReadable(file_id, group, msg_group) << std::endl;
442  if(HDF5::checkGroupReadable(file_id, group, msg_group))
443  {
444  // Group already exists, error!
445  std::ostringstream errmsg;
446  errmsg << "Error preparing pre-existing output file '"<<finalfile<<"' for writing via hdf5printer! The requested output group '"<<group<<" already exists in this file! Please take one of the following actions:"<<std::endl;
447  errmsg << " 1. Choose a new group via the 'group' option in the Printer section of your input YAML file;"<<std::endl;
448  errmsg << " 2. Delete the existing group from '"<<finalfile<<"';"<<std::endl;
449  errmsg << " 3. Delete the existing output file, or set 'delete_file_on_restart: true' in your input YAML file to give GAMBIT permission to automatically delete it (applies when -r/--restart flag used);"<<std::endl;
450  errmsg << std::endl;
451  errmsg << "*** Note: This error most commonly occurs when you try to resume a scan that has already finished! ***" <<std::endl;
452  errmsg << std::endl;
453  printer_error().raise(LOCAL_INFO, errmsg.str());
454  }
456  exit(1);
457  }
458  }
459 
460  // Deal with temporary files from previous runs
461  if(get_resume()) // If in resume mode, need to combine any existing process-level temporary files
462  {
463  logger() << LogTags::info << "Checking if temporary files from a previous scan exist" << EOM;
464  std::vector<std::string> tmp_files = find_temporary_files(true); //error if they are inconsistent
465  if(tmp_files.size()!=0)
466  {
467  logger() << LogTags::info << "Found "<<tmp_files.size()<<" temporary files from previous scan; preparing to combine them" << EOM;
468 
469  // This might take a while; for debugging purposes we will time it.
470  std::chrono::time_point<std::chrono::system_clock> start(std::chrono::system_clock::now());
472  std::chrono::time_point<std::chrono::system_clock> end(std::chrono::system_clock::now());
473  std::chrono::duration<double> time_taken = end - start;
474  logger() << LogTags::info << "HDF5 files from previous scan combined successfully. Operation took "<<std::chrono::duration_cast<std::chrono::seconds>(time_taken).count()<<" seconds." << EOM;
475  }
476  else
477  {
478  set_resume(false); //Tell ScannerBit that it shouldn't resume and do not find highest point.
479  logger() << LogTags::info << "No process-level temporary files found; skipping combination step." << EOM;
480  }
481  }
482  else // If we are not in resume mode, need to delete any temporary files left lying around in our target directory from previous incomplete runs
483  {
484  // If everything is ok, delete any existing temporary files, including temporary combined files
485  std::vector<std::string> tmp_files = find_temporary_files();
486  tmp_files.push_back(tmp_comb_file); // Adds temporary combined file to deletion list
487  for(auto it=tmp_files.begin(); it!=tmp_files.end(); ++it)
488  {
489  std::ostringstream command;
490  command << "rm -f "<<*it;
491  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command.str() << EOM;
492  FILE* fp = popen(command.str().c_str(), "r");
493  if(fp==NULL)
494  {
495  // Error running popen
496  std::ostringstream errmsg;
497  errmsg << "rank "<<myRank<<": Error deleting old temporary output file (attempting to do this because '--restart' flag was detected. Target for deletion was "<<*it<<")! popen failed to run the command (command was '"<<command.str()<<"')";
498  printer_error().raise(LOCAL_INFO, errmsg.str());
499  }
500  else if(pclose(fp)!=0)
501  {
502  // Command returned exit code!=0, or pclose failed
503  std::ostringstream errmsg;
504  errmsg << "rank "<<myRank<<": Error deleting old temporary output file (attempting to do this because '--restart' flag was detected. Target for deletion was "<<*it<<")! Shell command failed to execute successfully, please check stderr (command was '"<<command.str()<<"').";
505  printer_error().raise(LOCAL_INFO, errmsg.str());
506  }
507  }
508  } // end if(resume)
509 
510  std::cout <<"Rank "<<myRank<<" resume flag? "<<get_resume()<<std::endl;
511  if(get_resume())
512  {
513  //long highest = 0;
515  std::cout <<"Rank "<<myRank<<": tmp_comb_file readable? "<<HDF5::checkFileReadable(tmp_comb_file)<<"(filename: "<<tmp_comb_file<<")"<<std::endl;
517  {
518  logger() << LogTags::info << "Scanning existing temporary combined output file, to prepare for adding new data" << EOM;
519  // Open HDF5 file
521 
522  // Check that group is readable
523  std::string msg2;
524  if(not HDF5::checkGroupReadable(file_id, group, msg2))
525  {
526  // We are supposed to be resuming, but specified group was not readable in the output file, so we can't.
527  std::ostringstream errmsg;
528  errmsg << "Error! GAMBIT is in resume mode, however the chosen output system (HDF5Printer) was unable to open the specified group ("<<group<<") within the existing output file ("<<tmp_comb_file<<"). Resuming is therefore not possible; aborting run... (see below for IO error message)";
529  errmsg << std::endl << "(Strictly speaking we could allow the run to continue (if the scanner can find its necessary output files from the last run), however the printer output from that run is gone, so most likely the scan needs to start again).";
530  errmsg << std::endl << "IO error message: " << msg2;
531  printer_error().raise(LOCAL_INFO, errmsg.str());
532  }
533 
534  // Open requested group (creating it plus parents if needed)
536 
537  // Get previous highest pointID for our rank from the existing output file
538  // Might take a while, so time it.
539  std::chrono::time_point<std::chrono::system_clock> start(std::chrono::system_clock::now());
540  //PPIDpair highest_PPID
541  std::map<unsigned long, unsigned long long int> highest_PPIDs = get_highest_PPID_from_HDF5(group_id);
542  std::chrono::time_point<std::chrono::system_clock> end(std::chrono::system_clock::now());
543  std::chrono::duration<double> time_taken = end - start;
544 
545  for (size_t rank = 0; rank < mpiSize; rank++ )
546  {
547  auto it = highest_PPIDs.find(rank);
548  if (it != highest_PPIDs.end())
549  highests[rank] = it->second;
550  else
551  highests[rank] = 0;
552  }
553  //highest = highest_PPID.pointID;
554 
555  logger() << LogTags::info << "Extracted highest pointID calculated on rank "<<myRank<<" process during previous scan (it was "<< highests <<") from combined output. Operation took "<<std::chrono::duration_cast<std::chrono::seconds>(time_taken).count()<<" seconds." << EOM;
556 
557  // Cleanup
560  }
561  else
562  {
563  logger() << LogTags::info << "No temporary combined output file found; therefore no previous MPIrank/pointID pairs to parse. Will assume that this is a new run (since -r/--restart flag was not used)." << EOM;
564  }
565 
566  // Use global function get_point_id to fast-forward ScannerBit to the
567  // next unused pointID for this rank (actually we give it the highest known, it will iterate itself)
568  //get_point_id() = highest;
569  }
570  }
571 
572 #ifdef WITH_MPI
573  std::vector<int> resume_int_buf(1);
574  resume_int_buf[0] = get_resume();
575  myComm.Barrier();
576  myComm.Bcast(resume_int_buf, 1, 0);
577  set_resume(resume_int_buf.at(0));
578 
579  if (get_resume())
580  {
581  unsigned long long int highest;
582  myComm.Barrier();
583  myComm.Scatter(highests, highest, 0);
584  get_point_id() = highest;
585  }
586 #else
587  if (get_resume())
588  {
589  get_point_id() = highests[0];
590  }
591 #endif
592 
593  /*if(myRank==0)
594  {
595 #ifdef WITH_MPI
596  // Everyone wait until the master finishes pre-processing of existing files
597  // Calls 'check_for_error_messages' function while waiting, in case master fails to process the files.
598  myComm.allWaitForMasterWithFunc(PPFILES_PASS, check_for_error_messages);
599 #endif
600  }*/
601 
602  // Specify temporary output file name to use for this process
603  // Will combine with data from other processes when run is finished,
604  // or when resuming a run.
605  // TODO: Currently we have to do this even if no MPI is being used. Might just leave this for simplicity.
606  std::ostringstream rename2;
607  rename2 << finalfile << "_temp_" << myRank;
608  tmpfile = rename2.str();
609 
610  // Open requested file
611  bool oldfile;
613  file_id = HDF5::openFile(tmpfile,false,oldfile,'w'); // Don't overwrite existing file; we will check here if it exists (via oldfile) and throw an error if it does.
614  if(oldfile)
615  {
616  std::ostringstream errmsg;
617  errmsg << "Error! HDF5Printer attempted to open a temporary file for storing output data ("<<tmpfile<<"), however it found an existing file of the same name! This is a bug; pre-existing temporary files should already have been analysed and deleted before this point in the code.";
618  printer_error().raise(LOCAL_INFO, errmsg.str());
619  }
620 
621  // Open requested group (creating it plus parents if needed)
623 
624  // Open sub-group for RA datasets
626 
627  // Set the target dataset write location to the chosen group
630 
631  }
632  else
633  {
634  // Set up this printer in auxilliary mode
635  std::ostringstream ss;
636  ss << options.getValue<std::string>("name");
637 #ifdef WITH_MPI
638  ss << " for rank " << myRank;
639 #endif
640  printer_name = ss.str();
641  synchronised = options.getValueOrDef<bool>(true,"synchronised");
642  DBUG( std::cout << "Constructing Auxilliary HDF5Printer object (with name=\""<<printer_name<<"\" synchronised="<<synchronised<<")..." << std::endl; )
643 
644  primary_printer = dynamic_cast<HDF5Printer*>(this->get_primary_printer());
645 
646  // Set resume flag to match primary printer
647  set_resume(primary_printer->get_resume());
648 
649  // Fix up mpi communicator (need to copy the one created by the
650  // primary printer)
651 #ifdef WITH_MPI
652  myComm = primary_printer->get_Comm();
653 #endif
654 
655  // Retrieve the target location for adding new datasets from the primary
656  // printer
659  //startpos = primary_printer->get_startpos(); // OBSOLETE
660  }
661  // Now that communicator is set up, get its properties.
662 #ifdef WITH_MPI
663  myRank = myComm.Get_rank();
664  this->setRank(myRank);
665  mpiSize = myComm.Get_size();
666 #endif
667  }
668 
670  std::vector<std::string> HDF5Printer::find_temporary_files(const bool error_if_inconsistent)
671  {
673  std::pair<std::vector<std::string>,std::vector<size_t>> out = HDF5::find_temporary_files(finalfile);
674  std::vector<std::string> result = out.first;
675  std::vector<size_t> missing = out.second;
676 
677  // Check if all temporary files found (i.e. if output from some rank is missing)
678  if(error_if_inconsistent)
679  {
680  if( missing.size()>0 )
681  {
682  std::ostringstream errmsg;
683  errmsg << "HDF5Printer is attempting to resume from a previous run, but could not locate all the expected temporary output files (found "<<result.size()<<" temporary files, but are missing the files from the following ranks: "<<missing<<")! Resuming is therefore not possible; aborting run...";
684  printer_error().raise(LOCAL_INFO, errmsg.str());
685  }
686  }
687  return result;
688  }
689 
693  {
694  // Need to do combination before trying to get previous points.
695  // Make sure a barrier or similar exists outside this function to make
696  // sure master node does combination before workers try to retrieve
697  // previous points
698  if(not get_resume() or not (myRank==0))
699  {
700  std::ostringstream errmsg;
701  errmsg << "HDF5Printer: Tried to run function 'prepare_and_combined_tmp_files', however GAMBIT is not in 'resume' mode, and this is not the process with rank 0, so this is forbidden. This indicates a bug in the HDF5Printer logic, please report it.";
702  printer_error().raise(LOCAL_INFO, errmsg.str());
703  }
704 
707  << "HDF5Printer is preparing any existing output files from a previous run for resuming..."
708  << EOM;
709  bool combined_file_readable=false;
710  std::string msg;
712  {
713  logger() << LogTags::repeat_to_cout << LogTags::info << "...Existing temporary combined output file was found and is readable" << EOM;
714  combined_file_readable=true;
715  }
716  else
717  {
718  logger() << LogTags::repeat_to_cout << LogTags::info << "...No readable pre-existing temporary combined output file found" << EOM;
719  }
720  // Autodetect temporary files from previous run.
721  logger() << LogTags::info << " Autodetecting temporary files from previous run..." << EOM;
722  std::vector<std::string> tmp_files = find_temporary_files(true);
723 
724  if(tmp_files.size()==0)
725  {
726  logger() << LogTags::repeat_to_cout << LogTags::info << "...No process-level temporary files found. No combination to perform." << EOM;
727  // No temporary files exist
728  // This is ok, could just be starting a new run
729  // But we could also have just finished a run and accidentally tried to continue
730  // it -- the responsibility for checking this is left to the calling code, because
731  // we could also be trying to store the output of this run in a pre-existing hdf5
732  // file, so we cannot assume that a pre-existing file is a problem.
733  }
734  else
735  {
736  logger() << LogTags::repeat_to_cout << LogTags::info << "...Found "<<tmp_files.size()<<" process-level temporary files from a previous run. " << EOM;
737 
738  // Check if we are allowed to run the combine routines
740  {
741  logger() << LogTags::info << " Will now check to see if they are readable." << EOM;
742  // Check if temporary files from previous run are readable.
743  for(auto it=tmp_files.begin(); it!=tmp_files.end(); ++it)
744  {
745  std::string msg2;
746  if(not HDF5::checkFileReadable(*it, msg2))
747  {
748  // We are supposed to be resuming, but no readable output file was found, so we can't.
749  std::ostringstream errmsg;
750  errmsg << "Error! GAMBIT is in resume mode, however the chosen output system (HDF5Printer) could not locate/read all the required temporary files from the previous run (possibly there is no unfinished run to continue from). Resuming is therefore not possible; aborting run... (see below for IO error messages)";
751  errmsg << std::endl << "IO message for temporary combined output file read attempt: ";
752  errmsg << std::endl << " " << msg;
753  errmsg << std::endl << "IO message for temporary uncombined output file read attempt: ";
754  errmsg << std::endl << " " << msg2;
755  printer_error().raise(LOCAL_INFO, errmsg.str());
756  }
757  }
758  // Ok all the temporary files exist: combine them
759  // (but do it in non-resume mode, since any potentially existing output file is unreadable anyway)
760  std::ostringstream logmsg;
761  if(combined_file_readable)
762  {
763  logmsg << " Temporary combined output file detected" << std::endl;
764  logmsg << " (found "<<tmp_comb_file<<")"<<std::endl;
765  logmsg << " Will merge temporary files from last run into this file"<<std::endl;
766  logmsg << " If run completes, results will be moved to "<<finalfile<<std::endl;
767  }
768  else
769  {
770  logmsg << " No temporary combined output file detected" << std::endl;
771  logmsg << " (searched for "<<tmp_comb_file<<")"<<std::endl;
772  logmsg << " Will attempt to create it from temporary files from last run"<<std::endl;
773  logmsg << " If run completes, results will be moved to "<<finalfile<<std::endl;
774  }
775  logmsg << " Detected the following temporary files: " << std::endl;
776  for(auto it=tmp_files.begin(); it!=tmp_files.end(); ++it)
777  {
778  logmsg << " " << *it << std::endl;
779  }
780  logmsg << " Attempting combination into: "<< std::endl;
781  logmsg << " " << tmp_comb_file;
782  logger() << LogTags::printers << LogTags::info << logmsg.str() << EOM;
783  combine_output(tmp_files,false);
784  logger() << LogTags::repeat_to_cout << LogTags::printers << LogTags::info << "...Combination complete!" << EOM;
785  }
786  else
787  {
788  std::ostringstream errmsg;
789  errmsg << " Process level temporary HDF5 output was detected, however the 'disable_combine_routines' option is set for the HDF5 printer plugin. The combine code is therefore not permitted to run, so this job cannot proceed. Please either manually combine the output files, restart the scan, or set this option to 'false'" << std::endl;
790  printer_error().raise(LOCAL_INFO, errmsg.str());
791  }
792  }
793  }
794 
796  // Run by dependency resolver, which supplies the functors with a vector of VertexIDs whose requiresPrinting flags are set to true.
797  void HDF5Printer::initialise(const std::vector<int>& /*printmevec*/)
798  {
799  // Nothing to be done
800  }
801 
803  // Overload the base class virtual destructor
805  {
806  DBUG( std::cout << "Destructing HDF5Printer object (with name=\""<<printer_name<<"\")..." << std::endl; )
807  }
808 
810  void HDF5Printer::finalise(bool abnormal)
811  {
812  // Only the primary_printer should have to do anything here, since it
813  // has access to all the buffers.
815  {
816  std::cout << "Running finalise() routine for HDF5Printer (with name=\""<<printer_name<<"\", early="<<abnormal<<")"<<std::endl;
817  logger() << LogTags::printers << "Running finalise() routine for HDF5Printer (with name=\""<<printer_name<<"\")..." << EOM;
818 
819  // Make sure all the buffers are caught up to the final point.
821  logger() << LogTags::printers << "Print buffers synchronised; flushing them to disk" << EOM;
822  flush();
823  logger() << LogTags::printers << "Final buffer flush done ("<<printer_name<<")"<<EOM;
824 
825  // close HDF5 datasets, groups, and file
826 
827  for(BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
828  {
829  // Tell the buffers that they are done; they should then close the HDF5 datasets that they own.
830  it->second->finalise();
831  }
835 
836  // Check that sync buffers have a consistent length, and that RA buffers have a consistent length
837  // (though each group can have different lengths)
838  unsigned long dset_length = 0;
839  unsigned long RA_dset_length = 0;
840  logger() << LogTags::printers << "Checking for any leftover RA write attempts" << EOM;
841 
842  for(BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
843  {
844  //std::cout << it->second->get_label() <<": "<< it->second->postponed_RA_queue_length() <<std::endl;
845  std::size_t remaining_msgs = it->second->postponed_RA_queue_length();
846  if(remaining_msgs!=0)
847  {
848  std::ostringstream errmsg;
849  errmsg << "Error! There are (N="<<remaining_msgs<<") postponed random-access writes still left unwritten to disk in buffer "<<it->second->get_label()<<" at end of run! This may mean that some sync buffer data was not properly delivered from another process.";
850  printer_error().raise(LOCAL_INFO, errmsg.str());
851  }
852 
853  // Debug: check final lengths
854 #ifdef FINAL_MPI_DEBUG
855  std::cout << "dset: " << it->second->get_label() << ", length:" << it->second->get_dataset_length() << std::endl;
856 #endif
857 
858  if(it->second->is_synchronised())
859  {
860  if(dset_length==0) { dset_length = it->second->get_dataset_length(); }
861  else if(dset_length != it->second->get_dataset_length())
862  {
863  std::ostringstream errmsg;
864  errmsg << "Error! Inconsistency detected in output dataset lengths during hdf5printer::finalise(). Datasets from buffer "<<it->second->get_label()<<" have length "<<it->second->get_dataset_length()<<", but previous sync datasets had length "<<dset_length<<".";
865  printer_error().raise(LOCAL_INFO, errmsg.str());
866  }
867  }
868  else // not synchronised
869  {
870  if(RA_dset_length==0) { RA_dset_length = it->second->get_dataset_length(); }
871  else if(RA_dset_length != it->second->get_dataset_length())
872  {
873  std::ostringstream errmsg;
874  errmsg << "Error! Inconsistency detected in output dataset lengths during hdf5printer::finalise(). Datasets from buffer "<<it->second->get_label()<<" have length "<<it->second->get_dataset_length()<<", but previous RA datasets had length "<<RA_dset_length<<".";
875  printer_error().raise(LOCAL_INFO, errmsg.str());
876  }
877  }
878  }
879 
881  logger() << LogTags::printers << "Double-checking that all print buffers are empty" << EOM;
882  for(BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
883  {
884  if(it->second->is_synchronised())
885  {
886  if(it->second->sync_buffer_is_full()==true)
887  {
888  std::ostringstream errmsg;
889  errmsg << "rank "<<myRank<<": Error! Buffer "<<it->second->get_label()<<" reports sync_buffer_is_full()=true after finalise operations should be complete!";
890  printer_error().raise(LOCAL_INFO, errmsg.str());
891  }
892  if(it->second->sync_buffer_is_empty()==false)
893  {
894  std::ostringstream errmsg;
895  errmsg << "rank "<<myRank<<": Error! Buffer "<<it->second->get_label()<<" reports sync_buffer_is_empty()=false after finalise operations should be complete!";
896  printer_error().raise(LOCAL_INFO, errmsg.str());
897  }
898  }
899  else
900  {
901  if(it->second->postponed_RA_queue_length()!=0)
902  {
903  std::ostringstream errmsg;
904  errmsg << "rank "<<myRank<<": Error! Buffer "<<it->second->get_label()<<" reports postponed_RA_queue_length!=0 (length is "<<it->second->postponed_RA_queue_length()<<") after finalise operations should be complete!";
905  printer_error().raise(LOCAL_INFO, errmsg.str());
906  }
907  if(it->second->get_RA_queue_length()!=0)
908  {
909  std::ostringstream errmsg;
910  errmsg << "rank "<<myRank<<": Error! Buffer "<<it->second->get_label()<<" reports get_RA_queue_length()!=0 (length is "<<it->second->get_RA_queue_length()<<") after finalise operations should be complete!";
911  printer_error().raise(LOCAL_INFO, errmsg.str());
912  }
913  }
914  }
915 
917  // We now need to combine the data from all the different processes
918  // that were creating output during this run. First, make sure all
919  // processess are finished
920  // TODO: This requires that the scanner code did not shut down MPI
921  // already! In principle we could do this another way, e.g. by
922  // getting all processes to write a dummy file that signals that
923  // they are done, but for now let us just use this:
924  // TODO: In principle we shouldn't have to do this combination if
925  // either MPI is not used at all, or if mpiSize==1, however currently
926  // we still have to do it due to the way the RA writes are just
927  // "queued up" in the temporary HDF5 file, rather than actually
928  // written to the correct place. But should make it do the latter
929  // in these special cases.
930  if(not abnormal) // Don't do the combination in case of abnormal termination, since we cannot reliably wait for all the other processes to finish
931  {
932 #ifdef WITH_MPI
933  logger() << LogTags::printers << "We are in normal shutdown mode, meaning that the run has finished and output files should be combined. However, the master process must wait for all workers to write their output to disk before attempting the combination. We are now entering this barrier; if we are master we will wait here; all other processes will just register entry and then continue." << EOM;
934  myComm.masterWaitForAll(FINAL_SYNC);
935  // TODO! What if the master finishes before other processes? Then it will sit here. But what then if an abnormal shutdown signal is received?? Then the other processes will not enter the barrier! This is bad.
936  // To avoid the problem we'd have to make the wait able to monitor for termination signals.
937  // Also could just turn off the auto-combination and make the user "continue" the scan one final time to trigger the combination?
938 #endif
939 
940  logger() << LogTags::printers << "Passed FINAL_SYNC point in HDF5Printer finalise() routine" << EOM;
941 
942  if(myRank==0)
943  {
944  // Make sure all datasets etc are closed before doing this or else errors may occur.
945 
947  {
948  logger() << LogTags::printers << "We are the master process: beginning combination of output files." << EOM;
950  }
951  else
952  {
953  logger() << LogTags::printers << "We are the master process: but 'disable_combine_routines' is set to true. SKIPPING combination of output files." << EOM;
954  }
955  }
956  }
957  else
958  {
959  logger() << LogTags::printers << "rank "<<myRank<<": HDF5Printer is terminating abnormally (usually if some signal, e.g. CTRL-C detected); temporary hdf5 files NOT combined! Combination will be attempted upon resuming the run." << EOM;
960  }
961  logger() << LogTags::printers << "rank "<<myRank<<": HDF5Printer finalise() completed successfully." << EOM;
962  } //end if(is_primary_printer)
963  }
964 
967  void HDF5Printer::combine_output_py(const std::vector<std::string> tmp_files, const bool finalcombine)
968  {
969  std::ostringstream command;
970  std::ostringstream tmp_file_list;
971  for(auto it=tmp_files.begin(); it!=tmp_files.end(); ++it)
972  {
973  tmp_file_list << *it << " ";
974  }
975  command << "python "<< GAMBIT_DIR <<"/Printers/scripts/combine_hdf5.py --delete_tmp "<<tmp_comb_file<<" "<<group<<" "<<tmp_file_list.str()<<" 2>&1";
976  logger() << LogTags::printers << "Running HDF5 data combination script..." << std::endl;
977  logger() << "> " << command.str() << std::endl;
978  logger() << EOM;
979  FILE* fp = popen(command.str().c_str(), "r");
980  if(fp==NULL)
981  {
982  // Error running popen
983  std::ostringstream errmsg;
984  errmsg << "rank "<<myRank<<": Error running HDF5 data combination script during HDF5Printer finalise()! popen failed to run the specified command (command was '"<<command.str()<<"')";
985  printer_error().raise(LOCAL_INFO, errmsg.str());
986  }
987  // Something ran at least; get the stdout (plus redirected stderr)
988  char buffer[2048];
989  // read output into a c++ stream via buffer
990  std::ostringstream output;
991  while(fgets(buffer, sizeof(buffer), fp) != NULL) {
992  output << buffer;
993  }
994  int rc = pclose(fp);
996  logger() << "Stdout/stderr captured from HDF5printer combination script" << std::endl;
997  logger() << "--------------------" << std::endl;
998  logger() << output.str() << std::endl;
999  logger() << "--------------------" << std::endl;
1000  logger() << "end HDF5 combination script output" << EOM;
1001  if(rc!=0)
1002  {
1003  // Python error occurred
1004  std::ostringstream errmsg;
1005  errmsg << "rank "<<myRank<<": Error running HDF5 data combination script during HDF5Printer finalise()! Script ran, but return code != 0 was encountered; stdout and stderr from the system call can be found in the log files.";
1006  printer_error().raise(LOCAL_INFO, errmsg.str());
1007  }
1008  // Otherwise everything should be ok!
1009  if(finalcombine)
1010  {
1011  // This happens only at the end of the run; copy data to user-requested filename
1012  // TODO! This does not permit adding different runs into the same hdf5 file
1013  // Need to make sure Greg's combine code can do this.
1014  std::ostringstream command2;
1015  command2 <<"cp "<<tmp_comb_file<<" "<<finalfile<<" && rm "<<tmp_comb_file; // Note, deletes old file if successful
1016  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command2.str() << EOM;
1017  FILE* fp = popen(command2.str().c_str(), "r");
1018  if(fp==NULL)
1019  {
1020  // Error running popen
1021  std::ostringstream errmsg;
1022  errmsg << "rank "<<myRank<<": Error copying combined HDF5 data to final location during HDF5Printer finalise()! popen failed to run the specified copy (and delete) command (command was '"<<command2.str()<<"')";
1023  printer_error().raise(LOCAL_INFO, errmsg.str());
1024  }
1025  else if(pclose(fp)!=0)
1026  {
1027  // Command returned exit code!=0, or pclose failed
1028  std::ostringstream errmsg;
1029  errmsg << "rank "<<myRank<<": Error copying combined HDF5 data to final location during HDF5Printer finalise()! Shell command failed to execute successfully, please check stderr (command was '"<<command2.str()<<"').";
1030  printer_error().raise(LOCAL_INFO, errmsg.str());
1031  }
1032  // Success!
1033  }
1034  }
1035 
1038  void HDF5Printer::combine_output(const std::vector<std::string> tmp_files, const bool finalcombine)
1039  {
1040  logger() << LogTags::printers << "Running HDF5 data combination..." << EOM;
1041  // Do combination
1042  int num = tmp_files.size(); // We don't actually use their names here, Greg's code assumes that they
1043  // follow a fixed format and they all exist. We check for this before
1044  // running this function, so this should be fine.
1045 
1046  // If we set the second last flag 'true' then Greg's code will assume that a '_temp_combined' output file
1047  // exists, and it will crash if it doesn't. So we need to first check if such a file exists.
1048  bool combined_file_exists = Utils::file_exists(tmp_comb_file); // We already check this externally; pass in as flag?
1049  std::cout<<"combined_file_exists? "<<combined_file_exists<<std::endl;
1050  // Second last bool just tells the routine to delete the temporary files when it is done
1051  // Last flag, if false, tells routines to throw an error if any expected temporary file cannot be opened for any reason
1052  HDF5::combine_hdf5_files(tmp_comb_file, finalfile, group, num, combined_file_exists, true, false);
1053 
1054  // This is just left the same as the combine_output_py version!
1055  if(finalcombine)
1056  {
1057  // This happens only at the end of the run; copy data to user-requested filename
1058  // TODO! This does not permit adding different runs into the same hdf5 file
1059  // Need to make sure Greg's combine code can do this.
1060  std::ostringstream command2;
1061  command2 <<"cp "<<tmp_comb_file<<" "<<finalfile<<" && rm "<<tmp_comb_file; // Note, deletes old file if successful
1062  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command2.str() << EOM;
1063  FILE* fp = popen(command2.str().c_str(), "r");
1064  if(fp==NULL)
1065  {
1066  // Error running popen
1067  std::ostringstream errmsg;
1068  errmsg << "rank "<<myRank<<": Error copying combined HDF5 data to final location during HDF5Printer finalise()! popen failed to run the specified copy (and delete) command (command was '"<<command2.str()<<"')";
1069  printer_error().raise(LOCAL_INFO, errmsg.str());
1070  }
1071  else if(pclose(fp)!=0)
1072  {
1073  // Command returned exit code!=0, or pclose failed
1074  std::ostringstream errmsg;
1075  errmsg << "rank "<<myRank<<": Error copying combined HDF5 data to final location during HDF5Printer finalise()! Shell command failed to execute successfully, please check stderr (command was '"<<command2.str()<<"').";
1076  printer_error().raise(LOCAL_INFO, errmsg.str());
1077  }
1078  // Success!
1079  }
1080 
1081  }
1082 
1083 
1086  {
1087  if(location_id==-1)
1088  {
1089  std::ostringstream errmsg;
1090  errmsg << "Error! Tried to retrieve 'location_id' handle from HDF5Printer, but it is -1! This means the printer has not been set up correctly. This is a bug, please report it.";
1091  printer_error().raise(LOCAL_INFO, errmsg.str());
1092  }
1093  return location_id;
1094  }
1095 
1097  {
1098  if(RA_location_id==-1)
1099  {
1100  std::ostringstream errmsg;
1101  errmsg << "Error! Tried to retrieve 'RA_location' pointer from HDF5Printer, but it is -1! This means the printer has not been set up correctly. This is a bug, please report it.";
1102  printer_error().raise(LOCAL_INFO, errmsg.str());
1103  }
1104  return RA_location_id;
1105  }
1106 
1110  {
1111  error_if_key_exists(all_my_buffers, key, "all_my_buffers");
1112  all_my_buffers[key] = &newbuffer;
1113 
1114  error_if_key_exists(all_buffers, key, "all_buffers");
1115  primary_printer->all_buffers[key] = &newbuffer;
1116  }
1117 
1120  {
1121  bool answer = true;
1122  if( primary_printer->all_buffers.find(key)
1123  == primary_printer->all_buffers.end() )
1124  {
1125  answer = false;
1126  }
1127  return answer;
1128  }
1129 
1140  {
1141  // Check if it is in the lookup map already
1142  std::map<PPIDpair, unsigned long>& lookup = primary_printer->global_index_lookup;
1143  std::vector<PPIDpair>& reverse_lookup = primary_printer->reverse_global_index_lookup;
1144  std::map<PPIDpair, unsigned long>::const_iterator it = lookup.find(ppid);
1145  if ( it != lookup.end() ) {
1146  std::ostringstream errmsg;
1147  errmsg << "Error! Supplied PPID already exists in global_index_lookup map! It should only be added once, so there is a bug in HDF5Printer. Please report this error.";
1148  printer_error().raise(LOCAL_INFO, errmsg.str());
1149  }
1150 
1151  // If the list has reached its max allowed length, flush any queued RA
1152  // writes, clear the list, and increment RA_dset_offset.
1153  if(reverse_lookup.size()==MAX_PPIDPAIRS)
1154  {
1155  for (BaseBufferMap::iterator it = primary_printer->all_buffers.begin(); it != primary_printer->all_buffers.end(); it++)
1156  {
1157  if(it->second->is_synchronised())
1158  { /*do nothing, these are the sync buffers*/ }
1159  else
1160  {
1161  it->second->RA_flush(primary_printer->global_index_lookup);
1162  }
1163  }
1164  lookup.clear();
1165  reverse_lookup.clear();
1166  primary_printer->RA_dset_offset += MAX_PPIDPAIRS;
1167  }
1168 
1169  // Ok, now safe to add new stuff
1170  lookup[ppid] = reverse_lookup.size() + primary_printer->RA_dset_offset;
1171  reverse_lookup.push_back(ppid);
1172 
1173  // Need to make sure the pointID and MPIrank are stashed at this location in the RA output
1174  // (this should only occur from within a non-synchronised printer, so these should be
1175  // interpreted as RA writes)
1176  unsigned long pointID = ppid.pointID; // unsigned versions were coming out gibberish in python...
1177  unsigned int mpirank = ppid.rank;
1178  //std::cout << "rank "<<myRank<<": adding new RA PPID to list: (" << pointID << "," << mpirank << ")" << std::endl;
1179 
1180  // The ID numbers will be obtained via the 'aux' parameter system, but I think that is fine.
1181  // The call is a little bizarre because these are template functions from the base class, which
1182  // require this weird notation to resolve a compiler abiguity.
1183  this->print(pointID, "RA_pointID", mpirank, pointID);
1184  this->print(mpirank, "RA_MPIrank", mpirank, pointID);
1185  }
1186 
1189  {
1193  }
1194 
1197  {
1198  bool result = false;
1199  std::map<PPIDpair, unsigned long>& lookup = primary_printer->global_index_lookup;
1200  if ( lookup.find(ppid) != lookup.end() ) result = true;
1201  return result;
1202  }
1203 
1205  unsigned long HDF5Printer::get_global_index(const unsigned long pointID, const unsigned int mpirank)
1206  {
1207  std::map<PPIDpair, unsigned long>::iterator it;
1208  std::map<PPIDpair, unsigned long>& lookup = primary_printer->global_index_lookup;
1209  it = lookup.find(PPIDpair(pointID,mpirank));
1210  if ( it == lookup.end() )
1211  {
1212 #ifdef DEBUG_MODE
1213  std::cout<<"Contents of global_index_lookup map:"<<std::endl;
1214  for(it = lookup.begin(); it != lookup.end(); it++) {
1215  std::cout<<"[pointID="<<it->first.first<<", mpirank="<<it->first.second<<"] : index="<<it->second<<std::endl;
1216  }
1217 #endif
1218  std::ostringstream errmsg;
1219  errmsg << "Error retrieving global index for pointID="<<pointID<<", mpirank="<<mpirank<<"; no corresponding global index found. This means this point has not yet passed through the primary printer. This function is called in preparation for writing to data files via random access, so possibly something has gone wrong there.";
1220  printer_error().raise(LOCAL_INFO, errmsg.str());
1221  }
1222  return it->second;
1223  }
1224 
1226  // Will move the "write heads" of all buffers to "current_dset_position"
1227  // This should only required one "skip_append" command at most to each buffer; something
1228  // went wrong if more are required.
1230  {
1231  if(is_auxilliary_printer())
1232  {
1233  std::ostringstream errmsg;
1234  errmsg << "Error! synchronise_buffer() called by auxilliary hdf5 printer (name="<<printer_name<<")! Only the primary hdf5 printer is allowed to do this. This is a bug in the HDF5Printer class, please report it.";
1235  printer_error().raise(LOCAL_INFO, errmsg.str());
1236  }
1237 
1238  // Determine the desired sync position
1239  const unsigned long sync_pos = get_sync_pos()-1;
1240 
1241  // Cycle through all buffers and tell them to ensure they are at the right position
1242  // The buffers should throw an error if we are accidentally telling them to go backwards
1243  // or skip too many points or anything they can't do.
1244  // Here though we should only be moving them forward by one position.
1245 #ifdef DEBUG_MODE
1246  std::cout<<"rank "<<myRank<<": Synchronising buffers to position "<<sync_pos<<" (message from printer: "<<printer_name<<", is_auxilliary: "<<is_auxilliary_printer()<<", synchronised: "<<synchronised<<")"<<std::endl;
1247 #endif
1248 
1249  // Do the sync buffers first:
1250  for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1251  {
1252  if(it->second->is_synchronised()) it->second->synchronise_output_to_position(sync_pos);
1253  }
1254 
1255  // Now the RA buffers; they don't have to be the same size as the sync buffers, but it
1256  // is useful if we keep them all the same length. Need two loops; one to find the current
1257  // max length of an RA buffer, and the next to tell them all to adjust themselves to
1258  // that length (which won't actually happen until the next time they need to write their
1259  // data to disk).
1260  // TODO: may not need this, can possible just set them to match "get_N_RApointIDs()".
1261  // These are all the possible RA write position anyway...
1262  unsigned long maxRAlength = get_N_RApointIDs();
1263  //for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1264  //{
1265  // if(not it->second->is_synchronised())
1266  // {
1267  // unsigned long length = it->second->get_dataset_length();
1268  // if(length>maxRAlength) maxRAlength = length;
1269  // }
1270  //}
1271  for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1272  {
1273  if(not it->second->is_synchronised()) it->second->synchronise_output_to_position(maxRAlength);
1274  }
1275  }
1276 
1277  // (this will trigger MPI sends if needed)
1278  // Note that if one sync buffer is full, they should all be full!
1279  // By default this only empties buffers if they are full. Use
1280  // flag to force the flush for the finalise buffer dumps.
1282  {
1283 #ifdef DEBUG_MODE
1284  std::cout<<"rank "<<myRank<<": Emptying sync buffers (if full)..."<<std::endl;
1285 #endif
1286  unsigned int N_sync_buffers = 0;
1287  unsigned int N_were_full = 0;
1288  for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1289  {
1290  if(it->second->is_synchronised())
1291  {
1292  N_sync_buffers += 1;
1293  if(it->second->sync_buffer_is_full() or force)
1294  {
1295 #ifdef DEBUG_MODE
1296  std::cout<<"rank "<<myRank<<": Emptying sync buffer "<<it->second->get_label()<<std::endl;
1297 #endif
1298  N_were_full += 1; // Can get flushed if not full only if force=true
1299  it->second->flush();
1300  }
1301  }
1302  }
1303 
1304  // Make sure that all the buffers were full and have now been cleared!
1305  if(N_were_full != 0 and N_were_full != N_sync_buffers)
1306  {
1307  std::ostringstream errmsg;
1308  errmsg << "Error! Tried to empty all synchronised buffers, but some of them were not full! (N_were_full ("<<N_were_full<<") != N_sync_buffers ("<<N_sync_buffers<<"). This indicates that a loss of synchronisation has occurred, which is a bug in the hdf5printer system. Please report it. (Note: rank="<<myRank<<", printer_name="<<printer_name<<")";
1309  printer_error().raise(LOCAL_INFO, errmsg.str());
1310  }
1311 
1312  // Tell the HDF5 library to flush everything to disk
1313  herr_t err = H5Fflush(file_id, H5F_SCOPE_GLOBAL);
1314  if(err<0)
1315  {
1316  std::ostringstream errmsg;
1317  errmsg << "Error in HDF5Printer while trying to empty all synchronised buffers. Buffers were emptied to the HDF5 backend (seemingly) successfully, however H5Fflush returned an error value ("<<err<<"). That is, an error occurred while the HDF5 system attempted to flush its internally buffered data to disk. (Note: rank="<<myRank<<", printer_name="<<printer_name<<")";
1318  printer_error().raise(LOCAL_INFO, errmsg.str());
1319  }
1320  }
1321 
1326  {
1327  empty_sync_buffers(true); // NOTE: forces flush even if buffers not full
1328 
1329  // Need to do all the sync buffers before the RA buffers, so that at the end of the
1330  // run the sync buffers all get written to disk first, and no RA writes get left
1331  // in the postpone queue due to missing targets.
1332  for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1333  {
1334  if(it->second->is_synchronised())
1335  { // do nothing, these are the sync buffers
1336  //if(not it->second->sync_buffer_is_empty()) {}
1337  }
1338  else
1339  {
1340  it->second->RA_flush(primary_printer->global_index_lookup);
1341  //std::cout<<"rank "<<myRank<<": flushing RA buffer of "<<it->second->get_label()<<std::endl;
1342  }
1343  }
1344 
1345  // Make sure RA datasets are the same length
1346 
1347 
1348  }
1349 
1354  void HDF5Printer::reset(bool force)
1355  {
1356  //#ifdef DEBUG_MODE
1357  std::cout<<"resetting..."<<std::endl;
1358  std::cout<<"is_auxilliary_printer() = "<<is_auxilliary_printer()<<std::endl;
1359  std::cout<<"synchronised = "<<synchronised<<std::endl;
1360  std::cout<<"printer_name = "<<printer_name<<std::endl;
1361  //#endif
1362  if(not force and not this->is_auxilliary_printer())
1363  {
1364  std::ostringstream errmsg;
1365  errmsg << "Error! Tried to call reset() on the primary HDF5Printer (printer_name = "<<printer_name<<")! This would delete all the data from the scan and is not currently allowed! Probably this was called accidentally due to a bug.";
1366  printer_error().raise(LOCAL_INFO, errmsg.str());
1367  }
1368  else if(not force and synchronised)
1369  {
1370  std::ostringstream errmsg;
1371  errmsg << "Error! Tried to call reset() on an auxilliary HDF5Printer (printer_name = "<<printer_name<<") which is synchronised with the primary printer! This would delete all the point-level data written by this printer during the scan and is not currently allowed! Probably this was called accidentally due to a bug.";
1372  printer_error().raise(LOCAL_INFO, errmsg.str());
1373  }
1374  else
1375  {
1376  // Ok safe to do the resets.
1377  for (BaseBufferMap::iterator it = all_my_buffers.begin(); it != all_my_buffers.end(); it++)
1378  {
1379  // SPECIAL CASES!
1380  // We rely on the special RA buffers 'RA_MPIrank' and 'RA_pointID'
1381  // to keep track of which RA entries correspond to which
1382  // sync buffer points. Once written once, we preserve which slots
1383  // in the RA datasets correspond to a particular sync point, even
1384  // after resetting them, because multiple RA streams are all
1385  // writing data into the same hdf5 group. So if we invalidate the
1386  // RA_MPIrank and RA_pointID ranks, then we screw up the matching
1387  // between RA output streams (i.e. non-sync printers).
1388  // Could perhaps use a different groups for each aux printer
1389  // (i.e. stream). each with its own MPIrank and pointID datasets.
1390  // For now though, we will simply not invalidate these datasets.
1391  const std::string& name = it->second->get_label();
1392  if(name!="RA_MPIrank" and name!="RA_pointID")
1393  {
1394  // Reset the rest
1395  it->second->reset(force);
1396  }
1397  }
1398  // Also need to reset the PPID lists
1399  // TODO: The HDF5Printer currently assumes that ALL the auxilliary printers are
1400  // reset together. This is not really what we want, but to deal with it I would
1401  // need to make e.g. a separate "RA" group in the hdf5 output for every aux
1402  // stream, and then get the combine script to combine them all.
1403  reset_PPID_lists();
1404  }
1405  }
1406 
1413  void HDF5Printer::check_sync(const std::string& label, const int sync_type, bool checkall=false)
1414  {
1415  BaseBufferMap* map_to_check=NULL;
1416  if(checkall)
1417  {
1418  if(not is_primary_printer)
1419  {
1420  std::ostringstream errmsg;
1421  errmsg << "Error running 'check_sync'; flag 'checkall' set to true, but this is not the primary printer (it is "<<this->get_printer_name()<<") This is not allowed and is a bug, please fix it.";
1422  printer_error().raise(LOCAL_INFO, errmsg.str());
1423  }
1424  else
1425  {
1426  map_to_check=&all_buffers;
1427  }
1428  }
1429  else
1430  {
1431  map_to_check=&all_my_buffers;
1432  }
1433 
1434  // Explicitly check up on the synchronisation of all the buffers and their
1435  // associated datasets
1436  std::string sync_type_name = "non-perfect";
1437  long int diff; // required difference (dset_head_pos - sync_pos)
1438  if (sync_type==0) {
1439  sync_type_name = "pre-resync (non-perfect)";
1440  }
1441  else if(sync_type==1) {
1442  diff = 1;
1443  sync_type_name = "post-resync (perfect)";
1444  }
1445  else if(sync_type==2) {
1446  diff = 0;
1447  sync_type_name = "post-newpoint (perfect)";
1448  }
1449 
1450  std::cout<<"rank "<<myRank<<": Sync check (type: "<<sync_type_name<<"); "<<label<<std::endl;
1451 
1452 #define ERR_MSG \
1453 std::ostringstream errmsg; \
1454 errmsg << "rank "<<myRank<<": Error! ("<<label<<"; ("<<sync_type_name<<") sync check) Buffers have gone out of sync in printer '"<<printer_name<<"'!"<<std::endl; \
1455 errmsg << " head_pos = " << head_pos << "; name = " << name << std::endl; \
1456 errmsg << " sync_pos = " << sync_pos_plus1-1 << std::endl;
1457 
1458  for(BaseBufferMap::iterator it = map_to_check->begin(); it != map_to_check->end(); it++)
1459  {
1460  if(it->second->is_synchronised())
1461  {
1462  long head_pos = it->second->dset_head_pos();
1463  std::string name = it->second->get_label();
1464  long sync_pos_plus1 = get_sync_pos();
1465 
1466  if(sync_type==0) {
1467  if(head_pos+1 < sync_pos_plus1)
1468  {
1469  ERR_MSG
1470  errmsg << " (head_pos < syncpos) " << std::endl;
1471  printer_error().raise(LOCAL_INFO, errmsg.str());
1472  } else if (head_pos > sync_pos_plus1)
1473  {
1474  ERR_MSG
1475  errmsg << " (head_pos > syncpos + 1) " << std::endl;
1476  printer_error().raise(LOCAL_INFO, errmsg.str());
1477  } // else ok.
1478  }
1479  else if(sync_type==1 or sync_type==2)
1480  {
1481  if(head_pos != sync_pos_plus1-1 + diff)
1482  {
1483  ERR_MSG
1484  errmsg << " (head_pos != syncpos + "<<diff<<") " << std::endl;
1485  printer_error().raise(LOCAL_INFO, errmsg.str());
1486  }
1487  }
1488 
1489  std::cout << " head_pos = " << head_pos << "; name = " << name << std::endl;
1490  std::cout << " sync_pos = " << sync_pos_plus1-1 << std::endl;
1491  //it->second->sync_report();
1492  }
1493  }
1494  }
1495 
1496 
1498  // and perform adjustments needed to prepare the printer.
1499  void HDF5Printer::check_for_new_point(const PPIDpair& candidate_newpoint)
1500  {
1501  if(is_auxilliary_printer())
1502  {
1503  // Redirect task to primary printer
1504  primary_printer->check_for_new_point(candidate_newpoint);
1505  }
1506 
1507  // Check if we have changed target PointIDs since the last print call
1508  if(candidate_newpoint!=lastPointID)
1509  {
1510 
1511 #ifdef MPI_DEBUG
1512  std::cout<<"rank "<<myRank<<": New point detected (lastPointID="<<lastPointID<<", candidate_newpoint="<<candidate_newpoint<<")"<<std::endl;
1513  std::cout<<"rank "<<myRank<<": sync_pos="<<get_sync_pos()<<std::endl;
1514 #endif
1515 
1516  // Explicitly check up on the synchronisation of all the buffers and their
1517  // associated datasets
1518 
1519  // When a new point is detected, the "sync position" should be the
1520  // largest index which has a pointID assigned, which will match the
1521  // head_pos of any buffers which didn't write to that point.
1522  // The other buffers are allowed to be AT MOST one slot ahead of this,
1523  // having done a write for the last point and moved ahead one slot.
1524  //
1525  // At the end of this routine, all the buffers should once again
1526  // have their write heads pointing to both the same buffer index, and
1527  // the same output dataset index, and this index should be equal to
1528  // the number of pointIDs collected so far, minus 1 for the zero indexing
1529  // (with the last ID being the point from which the next print statements
1530  // will arrive).
1531  //
1532  // So, in the end, with sync_pos = reverse_global_index_lookup.size() - 1 = get_N_pointIDs() - 1
1533  // we require
1534  // dset_head_pos() == sync_pos
1535  // for every buffer.
1536  // However, at the beginning, we may have
1537  // dset_head_pos() == sync_pos + 1
1538  // for some buffers, which, after sync_pos is incremented with the new
1539  // pointID, will match the new sync_pos, and the other buffers will need
1540  // to be caught up. Any other state is an error.
1541  // So, in debug mode, we will check this first.
1542 
1543 #ifdef CHECK_SYNC
1544  check_sync("Prelim check (in check_for_new_point)", 0);
1545 #endif
1546 
1547  // Make sure all the buffers are caught up at the old position.
1549 
1550 #ifdef CHECK_SYNC
1551  check_sync("Post-resync check (in check_for_new_point)", 1);
1552 #endif
1553 
1554  // Yep the scanner has moved on, at least as far as the current process sees
1555  lastPointID = candidate_newpoint;
1556 
1557  // Check if the buffers are full and waiting to be emptied
1558  // (this will trigger MPI sends if needed)
1560 
1561 #ifdef CHECK_SYNC
1562  check_sync("Post-buffer-empty check (in check_for_new_point)", 1);
1563 #endif
1564 
1565  // Move forward buffer synchronisation counter by one slot
1567 
1568  // Now the buffers should all be synchronised; check this in debug mode.
1569 #ifdef CHECK_SYNC
1570  check_sync("Post-newpoint check (in check_for_new_point)", 2);
1571 #endif
1572 
1573  // Debugging only! check if buffers are somehow still full...
1574 #ifdef MPI_DEBUG
1575  for (BaseBufferMap::iterator it = all_my_buffers.begin(); it != all_my_buffers.end(); it++)
1576  {
1577  if(it->second->is_synchronised())
1578  {
1579  if(it->second->sync_buffer_is_full())
1580  {
1581  std::ostringstream errmsg;
1582  errmsg << "rank "<<myRank<<": Error! sync buffer with label "<<it->second->get_label()<<" is full! Should not be full at this point (printer="<<printer_name<<")";
1583  printer_error().raise(LOCAL_INFO, errmsg.str());
1584  }
1585  }
1586  }
1587 #endif
1588 
1589  // For resuming, we need to be able to retrieve the pointID and
1590  // MPIrank for every output point. So we need to make sure this
1591  // information is always output, and not rely on the scanner to
1592  // do it. Scanners can output it as well, since not all printers
1593  // may do it automatically (although it would be good if they
1594  // did), but in that case they should use the special ID codes
1595  // below to avoid duplication.
1596  // EDIT: Ok need to do these in the constructor also, since otherwise
1597  // the very first point gets missed.
1600  //_print(candidate_newpoint, "pointID", -1000, myRank, candidate_newpoint);
1601  //_print(myRank, "MPIrank", -1001, myRank, candidate_newpoint);
1602  }
1603  else
1604  {
1605  // no action required
1606  }
1607  }
1608 
1610 
1611  } // end namespace printers
1612 } // end namespace Gambit
1613 
1614 #undef DBUG
1615 #undef DEBUG_MODE
void print(MixMatrix A)
Helper function to print a matrix.
unsigned long get_sync_pos() const
Get the number of pointIDs know to this printer (should correspond to the number of "appends" each ac...
Define overloadings of the stream operator for various containers.
bool seen_PPID_before(const PPIDpair &ppid)
Ask the printer for the highest ID number known for a given rank process (needed for resuming...
BaseBufferMap all_my_buffers
Map containing pointers to all VertexBuffers contained in this printer.
unsigned long long int pointID
unsigned int myRank
MPI rank and size.
EXPORT_SYMBOLS error & printer_error()
Printer errors.
void common_constructor(const Options &)
Tasks common to the various constructors.
hid_t get_location() const
Retrieve pointer to HDF5 location to which datasets are added.
std::vector< PPIDpair > reverse_global_index_lookup
void reset(bool force=false)
Invalidate all data on disk which has been printed by this printer so far, and reset all the buffers ...
unsigned long RA_dset_offset
Offset needed to be added to the reverse lookup in order for it to match the output dataset position ...
#define LOCAL_INFO
Definition: local_info.hpp:34
void combine_output_py(const std::vector< std::string > tmp_files, const bool finalcombine)
Combine temporary hdf5 output files from each process into a single coherent hdf5 file...
hid_t openFile(const std::string &fname, bool overwrite, bool &oldfile, const char access_type='r')
File and group manipulation.
Definition: hdf5tools.cpp:182
PPIDpair lastPointID
ID of the point that this printer is currently working on.
Logging access header for GAMBIT.
EXPORT_SYMBOLS unsigned long long int & get_point_id()
Returns unigue pointid;.
Definitions of new MPI datatypes needed by printers.
void combine_output(const std::vector< std::string > tmp_files, const bool finalcombine)
Combine temporary hdf5 output files from each process into a single coherent hdf5 file This version o...
vertexID / sub-print index pair Identifies individual buffers (I call them VertexBuffer, but actually there can be more than one per vertex)
void reset_PPID_lists()
Completely reset the PPIDlists (e.g. used when printer is reset)
Greg&#39;s code for combining HDF5 output of multiple MPI processes.
std::map< unsigned long, unsigned long long int > get_highest_PPID_from_HDF5(hid_t group_id)
GAMBIT signal handling functions.
void add_PPID_to_list(const PPIDpair &)
Add PPIDpair to global index list.
EXPORT_SYMBOLS bool check_if_shutdown_begun()
Check for signals that early shutdown is required If an MPI message telling us to perform an emergenc...
void synchronise_buffers()
Function to ensure buffers are all synchronised to the same absolute position.
bool hasKey(const args &... keys) const
Getters for key/value pairs (which is all the options node should contain)
General small utility functions.
TYPE getValue(const args &... keys) const
HDF5_BACKEND_TYPES bool is_stream_managed(VBIDpair &key) const
HDF5Printer-specific functions.
void flush()
Empty all the buffers to disk Note: Empty sync buffers will not get flushed, to avoid writing extra b...
HDF5 interface printer class declaration.
void finalise(bool abnormal=false)
Perform final cleanup and write tasks.
std::string get_printer_name()
Get the name of this printer.
EXPORT_SYMBOLS bool file_exists(const std::string &filename)
Check if a file exists.
bool synchronised
Flag to specify whether all buffers created by this printer should be synchronised and iterated along...
void empty_sync_buffers(bool force=false)
Check if the buffers are full and waiting to be emptied By default this only empties buffers if they ...
EXPORT_SYMBOLS SignalData & signaldata()
Retrieve global instance of signal handler options struct.
void initialise(const std::vector< int > &)
Virtual function overloads:
const Logging::endofmessage EOM
Explicit const instance of the end of message struct in Gambit namespace.
Definition: logger.hpp:99
#define DBUG(x)
HDF5Printer * primary_printer
Pointer to the primary printer object.
void check_sync(const std::string &label, const int sync_type, bool checkall)
For debugging: check that buffers are synced correctly Flag sets whether "perfect" sync is required...
#define ERR_MSG
Logging::LogMaster & logger()
Function to retrieve a reference to the Gambit global log object.
Definition: logger.cpp:95
hid_t closeFile(hid_t file)
Close hdf5 file.
Definition: hdf5tools.cpp:92
void combine_hdf5_files(const std::string output_file, const std::string &base_file_name, const std::string &group, const size_t num, const bool resume, const bool cleanup, const bool skip, const std::vector< std::string > input_files=std::vector< std::string >())
START_MODEL dNur_CMB r
EXPORT_SYMBOLS const str & ensure_path_exists(const str &)
Ensure that a path exists (and then return the path, for chaining purposes)
void error_if_key_exists(const std::map< T, U > &m, const T &key, const std::string &tag)
Helper function to check if a VertexBuffer key already exists in a map.
Definition: hdf5printer.hpp:73
bool checkFileReadable(const std::string &fname, std::string &msg)
Check if hdf5 file exists and can be opened in read/write mode.
Definition: hdf5tools.cpp:268
TYPE getValueOrDef(TYPE def, const args &... keys) const
unsigned long get_global_index(const ulong pointID, const uint mpirank)
Retrieve index from global lookup table, with error checking.
Tools for accessing printers.
A simple C++ wrapper for the MPI C bindings.
unsigned long sync_pos
Position to start writing new output.
void prepare_and_combine_tmp_files()
Scan for existing temporary files, in preparation for combining them Should only do this if scan is r...
VertexBuffer abstract interface base class.
Exception objects required for standalone compilation.
hid_t openGroup(hid_t file_id, const std::string &name, bool nocreate=false)
Definition: hdf5tools.cpp:512
void increment_sync_pos()
Move head dataset sync position.
std::map< PPIDpair, unsigned long > global_index_lookup
Map from pointID,thread pairs to absolute dataset indices.
HDF5Printer(const Options &, BasePrinter *const primary=NULL)
Constructor (for construction via inifile options)
std::map< VBIDpair, VertexBufferBase * > BaseBufferMap
Helpful typedefs.
Definition: hdf5printer.hpp:69
Derived dataset interface, with methods for writing scalar records (i.e.
unsigned long get_N_RApointIDs()
Get the number of RA write locations known to the primary printer NOTE: the meaning of this has chang...
std::string printer_name
Label for printer, mostly for more helpful error messages.
bool checkGroupReadable(hid_t location, const std::string &groupname, std::string &msg)
Check if a group exists and can be accessed.
Definition: hdf5tools.cpp:302
bool is_primary_printer
Flag to specify if this is the primary printer or not.
void setValue(const KEYTYPE &key, const VALTYPE &val)
Basic setter, for adding extra options.
EXPORT_SYMBOLS const PPIDpair nullpoint
Define &#39;nullpoint&#39; const.
void insert_buffer(VBIDpair &key, VertexBufferBase &newbuffer)
Add a pointer to a new buffer to the global list.
A collection of tools for interacting with HDF5 databases.
bool disable_combine_routines
Flag to disable combination of hdf5 output (user will have to run the combination routines manually) ...
void check_for_new_point(const PPIDpair &)
Check whether printing to a new parameter space point is about to occur and perform adjustments neede...
pointID / process number pair Used to identify a single parameter space point
TODO: see if we can use this one:
Definition: Analysis.hpp:33
A small wrapper object for &#39;options&#39; nodes.
std::pair< std::vector< std::string >, std::vector< size_t > > find_temporary_files(const std::string &finalfile)
Search for temporary files to be combined.
void check_for_error_messages()
std::vector< std::string > find_temporary_files(const bool error_if_inconsistent=false)
Search the output directory for temporary files (pre-combination)
std::vector< T > get_chunk(std::size_t i, std::size_t length) const
READ methods (perhaps can generalise to non-scalar case, but this doesn&#39;t exist yet for writing anywa...
BaseBufferMap all_buffers
Things which other printers need access to.