gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-2191-ga4742ac
a Global And Modular Bsm Inference Tool
hdf5printer.cpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
84 
85 
86 // Standard libraries
87 #include <map>
88 #include <vector>
89 #include <algorithm>
90 #include <ios>
91 #include <sstream>
92 #include <iostream>
93 #include <fstream>
94 #include <iomanip>
95 #include <cstdlib> // For popen in finalise()
96 #include <chrono>
97 
98 // Gambit
103 
104 #include "gambit/cmake/cmake_variables.hpp"
109 #include "gambit/Logs/logger.hpp"
110 
111 // MPI bindings
114 
115 // Switch for debugging output (manual at the moment)
116 
117 #ifdef DEBUG_MODE
118 #define DBUG(x) x
119 #else
120 #define DBUG(x)
121 #endif
122 
123 //#define CHECK_SYNC
124 
125 // Debugging for 'finalise' routine
126 #define FINAL_CHECK_SYNC
127 #define FINAL_DEBUG_MODE
128 
129 // Code!
130 namespace Gambit
131 {
132  namespace Printers
133  {
134 
135  // Locally defined helper struct
136  struct DSetData
137  {
138  // Input data for HDF5 helper functions
139  int rank;
140 
141  // Dataset metadata
142  std::vector<std::string> names;
143  std::vector<unsigned long> lengths;
144 
146  // // Contents of pointID and mpirank datasets
147  // std::vector<unsigned long> pointIDs;
148  // std::vector<int> pointIDs_isvalid;
149  // std::vector<unsigned int> mpiranks;
150  // std::vector<int> mpiranks_isvalid;
151 
152  // Report error
153  std::string local_info;
154  std::string errmsg;
155 
156  DSetData(int r) : rank(r) {}
157  };
158 
159  // Helper function to check for GAMBIT shutdown messages due to errors in other processes
161  {
162  signaldata().check_if_shutdown_begun(); // Will throw a shutdown exception if an emergency shutdown command is received via MPI
163  }
164 
165  // Helper function for examining existing HDF5 file during verification stage
166  // Finds the highest PPID for our rank
167  // (separate function checks datasets for consistent lengths; that should run first)
168 
169  // Note; resumed runs may use different numbers of processes to the initial run.
170  // But this should be no problem; if there are new processes added, the highest
171  // previous PPID number for those ranks is just zero. If there are fewer, then
172  // there will be still be matching old-process ranks for all of the new ranks.
173  //PPIDpair
174  std::map<unsigned long, unsigned long long int> HDF5Printer::get_highest_PPID_from_HDF5(hid_t group_id)
175  {
176  //unsigned long long int highest_pointID = 0; // Highest ID found so far
177  std::map<unsigned long, unsigned long long int> highest_pointIDs;
178 
179  // Chunking variables
180  static const std::size_t CHUNKLENGTH = 1000; // Should be a reasonable value
181 
182  // Interfaces for the datasets
183  // Make sure the types used here don't get out of sync with the types used to write the original datasets
184  // We open the datasets in "resume" mode to access existing dataset, and make "const" to disable writing of new data. i.e. "Read-only" mode.
185  const DataSetInterfaceScalar<unsigned long long, CHUNKLENGTH> pointIDs(group_id, "pointID", true, 'r');
186  const DataSetInterfaceScalar<int, CHUNKLENGTH> pointIDs_isvalid (group_id, "pointID_isvalid", true, 'r');
187  const DataSetInterfaceScalar<int, CHUNKLENGTH> mpiranks (group_id, "MPIrank", true, 'r');
188  const DataSetInterfaceScalar<int, CHUNKLENGTH> mpiranks_isvalid (group_id, "MPIrank_isvalid", true, 'r');
189 
190  // Error check lengths. This should already have been done for all datasets in the group, but
191  // we will double-check these four here.
192  const std::size_t dset_length = pointIDs.dset_length();
193  const std::size_t dset_length2 = pointIDs_isvalid.dset_length();
194  const std::size_t dset_length3 = mpiranks.dset_length();
195  const std::size_t dset_length4 = mpiranks_isvalid.dset_length();
196  if( (dset_length != dset_length2)
197  or (dset_length3 != dset_length4)
198  or (dset_length != dset_length3) )
199  {
200  std::ostringstream errmsg;
201  errmsg << "Error retrieving highest PPID from previous dataset! Unequal dataset lengths detected in pointID and MPIrank datasets:" <<std::endl;
202  errmsg << " pointIDs.dset_length() = " << dset_length << std::endl;
203  errmsg << " pointIDs_isvalid.dset_length() = " << dset_length2 << std::endl;
204  errmsg << " mpiranks.dset_length() = " << dset_length3 << std::endl;
205  errmsg << " mpiranks_isvalid.dset_length() = " << dset_length4 << std::endl;
206  errmsg << "This indicates either a bug in the HDF5printer or corruption of the datasets (possibly due to unsafe shutdown).";
207  printer_error().raise(LOCAL_INFO, errmsg.str());
208  }
209 
210  // Compute number of chunks
211  const std::size_t NCHUNKS = dset_length / CHUNKLENGTH; // Number of FULL chunks
212  const std::size_t REMAINDER = dset_length - (NCHUNKS*CHUNKLENGTH); // leftover after last full chunk
213 
214  std::size_t NCHUNKIT; // Number of chunk iterations to perform
215  if(REMAINDER==0) { NCHUNKIT = NCHUNKS; }
216  else { NCHUNKIT = NCHUNKS+1; } // Need an extra iteration to deal with incomplete chunk
217 
218  logger()<<"Begining iteration through existing HDF5 output for rank "<<getRank()<<", searching for previous highest pointID."<<EOM;
219 
220  // Iterate through dataset in chunks
221  for(std::size_t i=0; i<NCHUNKIT; ++i)
222  {
223  std::size_t offset = i*CHUNKLENGTH;
224  std::size_t length;
225 
226  if(i==NCHUNKS){ length = REMAINDER; }
227  else { length = CHUNKLENGTH; }
228 
229  logger()<<"rank "<<getRank()<<": chunk "<<i<<": reading entries "<<offset<<" to "<<offset+length<<"."<<EOM;
230 
231  const std::vector<unsigned long long> pID_chunk = pointIDs.get_chunk(offset,length);
232  const std::vector<int> pIDvalid_chunk = pointIDs_isvalid.get_chunk(offset,length);
233  const std::vector<int> rank_chunk = mpiranks.get_chunk(offset,length);
234  const std::vector<int> rankvalid_chunk = mpiranks_isvalid.get_chunk(offset,length);
235 
236  // Check that retrieved lengths make sense
237  if (pID_chunk.size() != CHUNKLENGTH)
238  {
239  if(not (i==NCHUNKS and pID_chunk.size()==REMAINDER) )
240  {
241  std::ostringstream errmsg;
242  errmsg << "Error retrieving highest PPID from previous dataset! Size of chunk vector retrieved from pointID dataset ("<<pID_chunk.size()<<") does not match CHUNKLENGTH ("<<CHUNKLENGTH<<"), nor the expected remainder for the last chunk ("<<REMAINDER<<"). This probably indicates a bug in the DataSetInterfaceScalar.get_chunk routine, please report it. Error occurred while reading chunk i="<<i<<std::endl;
243  printer_error().raise(LOCAL_INFO, errmsg.str());
244  }
245  }
246  if( (pID_chunk.size() != pIDvalid_chunk.size())
247  or (rank_chunk.size() != rankvalid_chunk.size())
248  or (pID_chunk.size() != rank_chunk.size()) )
249  {
250  std::ostringstream errmsg;
251  errmsg << "Error retrieving highest PPID from previous dataset! Unequal chunk lengths retrieved while iterating through in pointID and MPIrank datasets:" <<std::endl;
252  errmsg << " pID_chunk.size() = " << pID_chunk.size() << std::endl;
253  errmsg << " pIDvalid_chunk.size() = " << pIDvalid_chunk.size() << std::endl;
254  errmsg << " rank_chunk.size() = " << rank_chunk.size() << std::endl;
255  errmsg << " rankvalid_chunk.size()= " << rankvalid_chunk.size() << std::endl;
256  errmsg << " CHUNKLENGTH = " << CHUNKLENGTH << std::endl;
257  errmsg << "This indicates either a bug in the HDF5printer or corruption of the datasets (possibly due to unsafe shutdown). Error occurred while reading chunk i="<<i<<std::endl;
258  printer_error().raise(LOCAL_INFO, errmsg.str());
259  }
260 
261  // Iterate within the chunk
262  for(std::size_t j=0; j<length; ++j)
263  {
264  //Check validity flags agree
265  if(pIDvalid_chunk[j] != rankvalid_chunk[j])
266  {
267  std::ostringstream errmsg;
268  errmsg << "Error retrieving highest PPID from previous dataset! Incompatible validity flags detected in pointID_isvalid and MPIrank_isvalid datasets at position j="<<j<<" in chunk i="<<i<<"(with CHUNKLENGTH="<<CHUNKLENGTH<<"). Specifically:"<<std::endl;
269  errmsg << " pIDvalid_chunk[j] = " << pIDvalid_chunk[j] << std::endl;
270  errmsg << " rankvalid_chunk[j] = " << rankvalid_chunk[j] << std::endl;
271  errmsg << "This most likely indicates a bug in the HDF5printer, but could indicate corruption of the datasets (possibly due to unsafe shutdown). Please report it.";
272  printer_error().raise(LOCAL_INFO, errmsg.str());
273  }
274 
275  //std::cerr<<"rank "<<getRank()<<": Entry (valid="<<pIDvalid_chunk[j]<<"): rank="<<rank_chunk[j]<<" , pointID="<<pID_chunk[j]<<std::endl;
276 
277  // Continue only if entry is marked as "valid" and corresponds to our rank
278  if(rankvalid_chunk[j])// and rank_chunk[j]==getRank())
279  {
280  // Test the pointID for this point to see if it is the highest so far.
281  if(pID_chunk[j] > highest_pointIDs[rank_chunk[j]])
282  {
283  highest_pointIDs[rank_chunk[j]] = pID_chunk[j];
284  //std::cerr<<"rank "<<getRank()<<": new highest pointID found = "<<highest_pointID<<std::endl;
285  }
286  }
287  // else continue iteration
288  }
289  }
290 
291  // Return the highest ID found (-1 if none)
292  return highest_pointIDs; //PPIDpair(highest_pointID,getRank());
293  }
294 
295  // We are going to have to combine this data with information from the
296  // scanners (using the auxilliary printers). In order to do this efficiently,
297  // we will store the pointIDs and ranks in a dataset seperate from the
298  // bulk of the data (but correlated with it) so that we can quickly search
299  // for records by their pointID and rank, and then write new data to them.
300  //
301  // NOTE: will have to change the auxilliary printers a bit, so that they
302  // communicate what they intend to write back to the main printer... or something.
303 
304 
306 
307  // Constructor
308  HDF5Printer::HDF5Printer(const Options& options, BasePrinter* const primary)
309  : BasePrinter(primary,options.getValueOrDef<bool>(false,"auxilliary"))
310  , lastPointID(nullpoint)
311  , printer_name("Primary printer")
312  , myRank(0)
313  , mpiSize(1)
314 #ifdef WITH_MPI
315  , myComm() // initially attaches to MPI_COMM_WORLD
316 #endif
317  {
318  common_constructor(options);
319  }
320 
321  // Get options required to construct a reader object that can read
322  // the previous output of this printer.
324  {
325  Options options;
326  // Set options that we need later to construct a reader object for
327  // previous output, if required.
328  options.setValue("type", "hdf5");
329  options.setValue("file", tmp_comb_file);
330  options.setValue("group", group);
331  return options;
332  }
333 
335  {
336 #ifdef WITH_MPI
337  // Note; here 'myRank' is the REAL mpi rank. Used
338  // for process-specific actions.
339  // the inherited 'getRank' function should be used for
340  // printing "as if" this process is from that rank,
341  // i.e. mostly just for setting point ID codes.
342  // 'myRank' will not change, but getRank() may be
343  // changed by the scanner (e.g. postprocessor).
344  myRank = myComm.Get_rank();
345  this->setRank(myRank);
346 #endif
347 
348  // Disable output combination routines?
349  disable_combine_routines = options.getValueOrDef<bool>(false,"disable_combine_routines");
350 
351  if(not this->is_auxilliary_printer())
352  {
353  // Set up this printer in primary mode
354  DBUG( std::cout << "Constructing Primary HDF5Printer object..." << std::endl; )
355  is_primary_printer = true;
356 
357  set_resume(options.getValue<bool>("resume"));
358 
359  // Set up communicator context for HDF5 printer system
360 #ifdef WITH_MPI
361  myComm.dup(MPI_COMM_WORLD,"HDF5printerComm"); // duplicates MPI_COMM_WORLD
362  mpiSize = myComm.Get_size();
363 #endif
364 
365  std::ostringstream ss;
366  ss << "Primary printer for rank " << myRank;
367  printer_name = ss.str();
368 
369  // Name of file where results should ultimately end up
370  std::ostringstream ff;
371  if(options.hasKey("output_path"))
372  {
373  ff << options.getValue<std::string>("output_path") << "/";
374  }
375  else
376  {
377  ff << options.getValue<std::string>("default_output_path") << "/";
378  }
379  if(options.hasKey("output_file"))
380  {
381  ff << options.getValue<std::string>("output_file");
382  }
383  else
384  {
385  printer_error().raise(LOCAL_INFO, "No 'output_file' entry specified in the options section of the Printer category of the input YAML file. Please add a name there for the output hdf5 file of the scan.");
386  }
387  finalfile = ff.str();
388 
389  // Name of file where combined results from previous (unfinished) runs end up
390  std::ostringstream rename;
391  rename << finalfile << "_temp_combined";
392  tmp_comb_file = rename.str();
393 
394  // HDF5 group (virtual "folder") inside output file in which to store datasets
395  group = options.getValueOrDef<std::string>("/","group");
396 
397  // Delete final target file (or group) if one with same name already exists? (and if we are restarting the run)
398  // This is just for convenience during testing; by default datasets will simply be replaced in/added to
399  // existing target HDF5 files. This lets one combine data from many scans into one file if desired.
400  bool overwrite_file = options.getValueOrDef<bool>(false,"delete_file_on_restart");
401 
402  std::vector<unsigned long long int> highests(mpiSize);
403 
404  if(myRank==0)
405  {
406  // Check whether a readable output file exists with the name that we want to use.
407  std::string msg_finalfile;
408  #ifdef DEBUG_MODE
409  std::cout << "File readable: " << finalfile << " : " << HDF5::checkFileReadable(finalfile, msg_finalfile) <<std::endl;
410  #endif
411  if(HDF5::checkFileReadable(finalfile, msg_finalfile))
412  {
413  if(overwrite_file and not get_resume())
414  {
415  // Note: "not resume" means "start or restart"
416  // Delete existing output file
417  std::ostringstream command;
418  command << "rm -f "<<finalfile;
419  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command.str() << EOM;
420  FILE* fp = popen(command.str().c_str(), "r");
421  if(fp==NULL)
422  {
423  // Error running popen
424  std::ostringstream errmsg;
425  errmsg << "rank "<<myRank<<": Error deleting existing output file (requested by 'delete_file_on_restart' printer option; target filename is "<<finalfile<<")! popen failed to run the command (command was '"<<command.str()<<"')";
426  printer_error().raise(LOCAL_INFO, errmsg.str());
427  }
428  else if(pclose(fp)!=0)
429  {
430  // Command returned exit code!=0, or pclose failed
431  std::ostringstream errmsg;
432  errmsg << "rank "<<myRank<<": Error deleting existing output file (requested by 'delete_file_on_restart' printer option; target filename is "<<finalfile<<")! Shell command failed to executed successfully, see stderr (command was '"<<command.str()<<"').";
433  printer_error().raise(LOCAL_INFO, errmsg.str());
434  }
435  }
436  else
437  {
438  // File exists, so check if 'group' is readable, and throw error if it exists
439  file_id = HDF5::openFile(finalfile);
440  std::string msg_group;
441  std::cout << "Group readable: " << finalfile << " , " << group << " : " << HDF5::checkGroupReadable(file_id, group, msg_group) << std::endl;
442  if(HDF5::checkGroupReadable(file_id, group, msg_group))
443  {
444  // Group already exists, error!
445  std::ostringstream errmsg;
446  errmsg << "Error preparing pre-existing output file '"<<finalfile<<"' for writing via hdf5printer! The requested output group '"<<group<<" already exists in this file! Please take one of the following actions:"<<std::endl;
447  errmsg << " 1. Choose a new group via the 'group' option in the Printer section of your input YAML file;"<<std::endl;
448  errmsg << " 2. Delete the existing group from '"<<finalfile<<"';"<<std::endl;
449  errmsg << " 3. Delete the existing output file, or set 'delete_file_on_restart: true' in your input YAML file to give GAMBIT permission to automatically delete it (applies when -r/--restart flag used);"<<std::endl;
450  errmsg << std::endl;
451  errmsg << "*** Note: This error most commonly occurs when you try to resume a scan that has already finished! ***" <<std::endl;
452  errmsg << std::endl;
453  printer_error().raise(LOCAL_INFO, errmsg.str());
454  }
456  exit(1);
457  }
458  }
459 
460  // Deal with temporary files from previous runs
461  if(get_resume()) // If in resume mode, need to combine any existing process-level temporary files
462  {
463  logger() << LogTags::info << "Checking if temporary files from a previous scan exist" << EOM;
464  std::vector<std::string> tmp_files = find_temporary_files(true); //error if they are inconsistent
465  if(tmp_files.size()!=0)
466  {
467  logger() << LogTags::info << "Found "<<tmp_files.size()<<" temporary files from previous scan; preparing to combine them" << EOM;
468 
469  // This might take a while; for debugging purposes we will time it.
470  std::chrono::time_point<std::chrono::system_clock> start(std::chrono::system_clock::now());
472  std::chrono::time_point<std::chrono::system_clock> end(std::chrono::system_clock::now());
473  std::chrono::duration<double> time_taken = end - start;
474  logger() << LogTags::info << "HDF5 files from previous scan combined successfully. Operation took "<<std::chrono::duration_cast<std::chrono::seconds>(time_taken).count()<<" seconds." << EOM;
475  }
476  else
477  {
478  logger() << LogTags::info << "No process-level temporary files found; skipping combination step." << EOM;
480  {
481  // There is no combined output either, so disable resume mode
482  std::ostringstream msg;
483  msg<<"No temporary output from a previous scan found (or it is unreadable); this will be treated as a NEW scan";
484  std::cout<<msg.str()<<std::endl;
485  logger() << LogTags::info << msg.str();
486  set_resume(false);
487  }
488  }
489  }
490  else // If we are not in resume mode, need to delete any temporary files left lying around in our target directory from previous incomplete runs
491  {
492  // If everything is ok, delete any existing temporary files, including temporary combined files
493  std::vector<std::string> tmp_files = find_temporary_files();
494  tmp_files.push_back(tmp_comb_file); // Adds temporary combined file to deletion list
495  for(auto it=tmp_files.begin(); it!=tmp_files.end(); ++it)
496  {
497  std::ostringstream command;
498  command << "rm -f "<<*it;
499  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command.str() << EOM;
500  FILE* fp = popen(command.str().c_str(), "r");
501  if(fp==NULL)
502  {
503  // Error running popen
504  std::ostringstream errmsg;
505  errmsg << "rank "<<myRank<<": Error deleting old temporary output file (attempting to do this because '--restart' flag was detected. Target for deletion was "<<*it<<")! popen failed to run the command (command was '"<<command.str()<<"')";
506  printer_error().raise(LOCAL_INFO, errmsg.str());
507  }
508  else if(pclose(fp)!=0)
509  {
510  // Command returned exit code!=0, or pclose failed
511  std::ostringstream errmsg;
512  errmsg << "rank "<<myRank<<": Error deleting old temporary output file (attempting to do this because '--restart' flag was detected. Target for deletion was "<<*it<<")! Shell command failed to execute successfully, please check stderr (command was '"<<command.str()<<"').";
513  printer_error().raise(LOCAL_INFO, errmsg.str());
514  }
515  }
516  } // end if(resume)
517 
518  std::cout <<"Rank "<<myRank<<" resume flag? "<<get_resume()<<std::endl;
519  if(get_resume())
520  {
521  //long highest = 0;
523  std::cout <<"Rank "<<myRank<<": tmp_comb_file readable? "<<HDF5::checkFileReadable(tmp_comb_file)<<"(filename: "<<tmp_comb_file<<")"<<std::endl;
525  {
526  logger() << LogTags::info << "Scanning existing temporary combined output file, to prepare for adding new data" << EOM;
527  // Open HDF5 file
529 
530  // Check that group is readable
531  std::string msg2;
532  if(not HDF5::checkGroupReadable(file_id, group, msg2))
533  {
534  // We are supposed to be resuming, but specified group was not readable in the output file, so we can't.
535  std::ostringstream errmsg;
536  errmsg << "Error! GAMBIT is in resume mode, however the chosen output system (HDF5Printer) was unable to open the specified group ("<<group<<") within the existing output file ("<<tmp_comb_file<<"). Resuming is therefore not possible; aborting run... (see below for IO error message)";
537  errmsg << std::endl << "(Strictly speaking we could allow the run to continue (if the scanner can find its necessary output files from the last run), however the printer output from that run is gone, so most likely the scan needs to start again).";
538  errmsg << std::endl << "IO error message: " << msg2;
539  printer_error().raise(LOCAL_INFO, errmsg.str());
540  }
541 
542  // Open requested group (creating it plus parents if needed)
544 
545  // Get previous highest pointID for our rank from the existing output file
546  // Might take a while, so time it.
547  std::chrono::time_point<std::chrono::system_clock> start(std::chrono::system_clock::now());
548  //PPIDpair highest_PPID
549  std::map<unsigned long, unsigned long long int> highest_PPIDs = get_highest_PPID_from_HDF5(group_id);
550  std::chrono::time_point<std::chrono::system_clock> end(std::chrono::system_clock::now());
551  std::chrono::duration<double> time_taken = end - start;
552 
553  for (size_t rank = 0; rank < mpiSize; rank++ )
554  {
555  auto it = highest_PPIDs.find(rank);
556  if (it != highest_PPIDs.end())
557  highests[rank] = it->second;
558  else
559  highests[rank] = 0;
560  }
561  //highest = highest_PPID.pointID;
562 
563  logger() << LogTags::info << "Extracted highest pointID calculated on rank "<<myRank<<" process during previous scan (it was "<< highests <<") from combined output. Operation took "<<std::chrono::duration_cast<std::chrono::seconds>(time_taken).count()<<" seconds." << EOM;
564 
565  // Cleanup
568  }
569  else
570  {
571  logger() << LogTags::info << "No temporary combined output file found; therefore no previous MPIrank/pointID pairs to parse. Will assume that this is a new run (since -r/--restart flag was not used)." << EOM;
572  }
573 
574  // Use global function get_point_id to fast-forward ScannerBit to the
575  // next unused pointID for this rank (actually we give it the highest known, it will iterate itself)
576  //get_point_id() = highest;
577  }
578  }
579 
580 #ifdef WITH_MPI
581  std::vector<int> resume_int_buf(1);
582  resume_int_buf[0] = get_resume();
583  myComm.Barrier();
584  myComm.Bcast(resume_int_buf, 1, 0);
585  set_resume(resume_int_buf.at(0));
586 
587  if (get_resume())
588  {
589  unsigned long long int highest;
590  myComm.Barrier();
591  myComm.Scatter(highests, highest, 0);
592  get_point_id() = highest;
593  }
594 #else
595  if (get_resume())
596  {
597  get_point_id() = highests[0];
598  }
599 #endif
600 
601  /*if(myRank==0)
602  {
603 #ifdef WITH_MPI
604  // Everyone wait until the master finishes pre-processing of existing files
605  // Calls 'check_for_error_messages' function while waiting, in case master fails to process the files.
606  myComm.allWaitForMasterWithFunc(PPFILES_PASS, check_for_error_messages);
607 #endif
608  }*/
609 
610  // Specify temporary output file name to use for this process
611  // Will combine with data from other processes when run is finished,
612  // or when resuming a run.
613  // TODO: Currently we have to do this even if no MPI is being used. Might just leave this for simplicity.
614  std::ostringstream rename2;
615  rename2 << finalfile << "_temp_" << myRank;
616  tmpfile = rename2.str();
617 
618  // Open requested file
619  bool oldfile;
621  file_id = HDF5::openFile(tmpfile,false,oldfile,'w'); // Don't overwrite existing file; we will check here if it exists (via oldfile) and throw an error if it does.
622  if(oldfile)
623  {
624  std::ostringstream errmsg;
625  errmsg << "Error! HDF5Printer attempted to open a temporary file for storing output data ("<<tmpfile<<"), however it found an existing file of the same name! This is a bug; pre-existing temporary files should already have been analysed and deleted before this point in the code.";
626  printer_error().raise(LOCAL_INFO, errmsg.str());
627  }
628 
629  // Open requested group (creating it plus parents if needed)
631 
632  // Open sub-group for RA datasets
634 
635  // Set the target dataset write location to the chosen group
638 
639  }
640  else
641  {
642  // Set up this printer in auxilliary mode
643  std::ostringstream ss;
644  ss << options.getValue<std::string>("name");
645 #ifdef WITH_MPI
646  ss << " for rank " << myRank;
647 #endif
648  printer_name = ss.str();
649  synchronised = options.getValueOrDef<bool>(true,"synchronised");
650  DBUG( std::cout << "Constructing Auxilliary HDF5Printer object (with name=\""<<printer_name<<"\" synchronised="<<synchronised<<")..." << std::endl; )
651 
652  primary_printer = dynamic_cast<HDF5Printer*>(this->get_primary_printer());
653 
654  // Set resume flag to match primary printer
655  set_resume(primary_printer->get_resume());
656 
657  // Fix up mpi communicator (need to copy the one created by the
658  // primary printer)
659 #ifdef WITH_MPI
660  myComm = primary_printer->get_Comm();
661 #endif
662 
663  // Retrieve the target location for adding new datasets from the primary
664  // printer
667  //startpos = primary_printer->get_startpos(); // OBSOLETE
668  }
669  // Now that communicator is set up, get its properties.
670 #ifdef WITH_MPI
671  myRank = myComm.Get_rank();
672  this->setRank(myRank);
673  mpiSize = myComm.Get_size();
674 #endif
675  }
676 
678  std::vector<std::string> HDF5Printer::find_temporary_files(const bool error_if_inconsistent)
679  {
681  std::pair<std::vector<std::string>,std::vector<size_t>> out = HDF5::find_temporary_files(finalfile);
682  std::vector<std::string> result = out.first;
683  std::vector<size_t> missing = out.second;
684 
685  // Check if all temporary files found (i.e. if output from some rank is missing)
686  if(error_if_inconsistent)
687  {
688  if( missing.size()>0 )
689  {
690  std::ostringstream errmsg;
691  errmsg << "HDF5Printer is attempting to resume from a previous run, but could not locate all the expected temporary output files (found "<<result.size()<<" temporary files, but are missing the files from the following ranks: "<<missing<<")! Resuming is therefore not possible; aborting run...";
692  printer_error().raise(LOCAL_INFO, errmsg.str());
693  }
694  }
695  return result;
696  }
697 
701  {
702  // Need to do combination before trying to get previous points.
703  // Make sure a barrier or similar exists outside this function to make
704  // sure master node does combination before workers try to retrieve
705  // previous points
706  if(not get_resume() or not (myRank==0))
707  {
708  std::ostringstream errmsg;
709  errmsg << "HDF5Printer: Tried to run function 'prepare_and_combined_tmp_files', however GAMBIT is not in 'resume' mode, and this is not the process with rank 0, so this is forbidden. This indicates a bug in the HDF5Printer logic, please report it.";
710  printer_error().raise(LOCAL_INFO, errmsg.str());
711  }
712 
715  << "HDF5Printer is preparing any existing output files from a previous run for resuming..."
716  << EOM;
717  bool combined_file_readable=false;
718  std::string msg;
720  {
721  logger() << LogTags::repeat_to_cout << LogTags::info << "...Existing temporary combined output file was found and is readable" << EOM;
722  combined_file_readable=true;
723  }
724  else
725  {
726  logger() << LogTags::repeat_to_cout << LogTags::info << "...No readable pre-existing temporary combined output file found" << EOM;
727  }
728  // Autodetect temporary files from previous run.
729  logger() << LogTags::info << " Autodetecting temporary files from previous run..." << EOM;
730  std::vector<std::string> tmp_files = find_temporary_files(true);
731 
732  if(tmp_files.size()==0)
733  {
734  logger() << LogTags::repeat_to_cout << LogTags::info << "...No process-level temporary files found. No combination to perform." << EOM;
735  // No temporary files exist
736  // This is ok, could just be starting a new run
737  // But we could also have just finished a run and accidentally tried to continue
738  // it -- the responsibility for checking this is left to the calling code, because
739  // we could also be trying to store the output of this run in a pre-existing hdf5
740  // file, so we cannot assume that a pre-existing file is a problem.
741  }
742  else
743  {
744  logger() << LogTags::repeat_to_cout << LogTags::info << "...Found "<<tmp_files.size()<<" process-level temporary files from a previous run. " << EOM;
745 
746  // Check if we are allowed to run the combine routines
748  {
749  logger() << LogTags::info << " Will now check to see if they are readable." << EOM;
750  // Check if temporary files from previous run are readable.
751  for(auto it=tmp_files.begin(); it!=tmp_files.end(); ++it)
752  {
753  std::string msg2;
754  if(not HDF5::checkFileReadable(*it, msg2))
755  {
756  // We are supposed to be resuming, but no readable output file was found, so we can't.
757  std::ostringstream errmsg;
758  errmsg << "Error! GAMBIT is in resume mode, however the chosen output system (HDF5Printer) could not locate/read all the required temporary files from the previous run (possibly there is no unfinished run to continue from). Resuming is therefore not possible; aborting run... (see below for IO error messages)";
759  errmsg << std::endl << "IO message for temporary combined output file read attempt: ";
760  errmsg << std::endl << " " << msg;
761  errmsg << std::endl << "IO message for temporary uncombined output file read attempt: ";
762  errmsg << std::endl << " " << msg2;
763  printer_error().raise(LOCAL_INFO, errmsg.str());
764  }
765  }
766  // Ok all the temporary files exist: combine them
767  // (but do it in non-resume mode, since any potentially existing output file is unreadable anyway)
768  std::ostringstream logmsg;
769  if(combined_file_readable)
770  {
771  logmsg << " Temporary combined output file detected" << std::endl;
772  logmsg << " (found "<<tmp_comb_file<<")"<<std::endl;
773  logmsg << " Will merge temporary files from last run into this file"<<std::endl;
774  logmsg << " If run completes, results will be moved to "<<finalfile<<std::endl;
775  }
776  else
777  {
778  logmsg << " No temporary combined output file detected" << std::endl;
779  logmsg << " (searched for "<<tmp_comb_file<<")"<<std::endl;
780  logmsg << " Will attempt to create it from temporary files from last run"<<std::endl;
781  logmsg << " If run completes, results will be moved to "<<finalfile<<std::endl;
782  }
783  logmsg << " Detected the following temporary files: " << std::endl;
784  for(auto it=tmp_files.begin(); it!=tmp_files.end(); ++it)
785  {
786  logmsg << " " << *it << std::endl;
787  }
788  logmsg << " Attempting combination into: "<< std::endl;
789  logmsg << " " << tmp_comb_file;
790  logger() << LogTags::printers << LogTags::info << logmsg.str() << EOM;
791  combine_output(tmp_files,false);
792  logger() << LogTags::repeat_to_cout << LogTags::printers << LogTags::info << "...Combination complete!" << EOM;
793  }
794  else
795  {
796  std::ostringstream errmsg;
797  errmsg << " Process level temporary HDF5 output was detected, however the 'disable_combine_routines' option is set for the HDF5 printer plugin. The combine code is therefore not permitted to run, so this job cannot proceed. Please either manually combine the output files, restart the scan, or set this option to 'false'" << std::endl;
798  printer_error().raise(LOCAL_INFO, errmsg.str());
799  }
800  }
801  }
802 
804  // Run by dependency resolver, which supplies the functors with a vector of VertexIDs whose requiresPrinting flags are set to true.
805  void HDF5Printer::initialise(const std::vector<int>& /*printmevec*/)
806  {
807  // Nothing to be done
808  }
809 
811  // Overload the base class virtual destructor
813  {
814  DBUG( std::cout << "Destructing HDF5Printer object (with name=\""<<printer_name<<"\")..." << std::endl; )
815  }
816 
818  void HDF5Printer::finalise(bool abnormal)
819  {
820  // Only the primary_printer should have to do anything here, since it
821  // has access to all the buffers.
823  {
824  std::cout << "Running finalise() routine for HDF5Printer (with name=\""<<printer_name<<"\", early="<<abnormal<<")"<<std::endl;
825  logger() << LogTags::printers << "Running finalise() routine for HDF5Printer (with name=\""<<printer_name<<"\")..." << EOM;
826 
827  // Make sure all the buffers are caught up to the final point.
829  logger() << LogTags::printers << "Print buffers synchronised; flushing them to disk" << EOM;
830  flush();
831  logger() << LogTags::printers << "Final buffer flush done ("<<printer_name<<")"<<EOM;
832 
833  // close HDF5 datasets, groups, and file
834 
835  for(BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
836  {
837  // Tell the buffers that they are done; they should then close the HDF5 datasets that they own.
838  it->second->finalise();
839  }
843 
844  // Check that sync buffers have a consistent length, and that RA buffers have a consistent length
845  // (though each group can have different lengths)
846  unsigned long dset_length = 0;
847  unsigned long RA_dset_length = 0;
848  logger() << LogTags::printers << "Checking for any leftover RA write attempts" << EOM;
849 
850  for(BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
851  {
852  //std::cout << it->second->get_label() <<": "<< it->second->postponed_RA_queue_length() <<std::endl;
853  std::size_t remaining_msgs = it->second->postponed_RA_queue_length();
854  if(remaining_msgs!=0)
855  {
856  std::ostringstream errmsg;
857  errmsg << "Error! There are (N="<<remaining_msgs<<") postponed random-access writes still left unwritten to disk in buffer "<<it->second->get_label()<<" at end of run! This may mean that some sync buffer data was not properly delivered from another process.";
858  printer_error().raise(LOCAL_INFO, errmsg.str());
859  }
860 
861  // Debug: check final lengths
862 #ifdef FINAL_MPI_DEBUG
863  std::cout << "dset: " << it->second->get_label() << ", length:" << it->second->get_dataset_length() << std::endl;
864 #endif
865 
866  if(it->second->is_synchronised())
867  {
868  if(dset_length==0) { dset_length = it->second->get_dataset_length(); }
869  else if(dset_length != it->second->get_dataset_length())
870  {
871  std::ostringstream errmsg;
872  errmsg << "Error! Inconsistency detected in output dataset lengths during hdf5printer::finalise(). Datasets from buffer "<<it->second->get_label()<<" have length "<<it->second->get_dataset_length()<<", but previous sync datasets had length "<<dset_length<<".";
873  printer_error().raise(LOCAL_INFO, errmsg.str());
874  }
875  }
876  else // not synchronised
877  {
878  if(RA_dset_length==0) { RA_dset_length = it->second->get_dataset_length(); }
879  else if(RA_dset_length != it->second->get_dataset_length())
880  {
881  std::ostringstream errmsg;
882  errmsg << "Error! Inconsistency detected in output dataset lengths during hdf5printer::finalise(). Datasets from buffer "<<it->second->get_label()<<" have length "<<it->second->get_dataset_length()<<", but previous RA datasets had length "<<RA_dset_length<<".";
883  printer_error().raise(LOCAL_INFO, errmsg.str());
884  }
885  }
886  }
887 
889  logger() << LogTags::printers << "Double-checking that all print buffers are empty" << EOM;
890  for(BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
891  {
892  if(it->second->is_synchronised())
893  {
894  if(it->second->sync_buffer_is_full()==true)
895  {
896  std::ostringstream errmsg;
897  errmsg << "rank "<<myRank<<": Error! Buffer "<<it->second->get_label()<<" reports sync_buffer_is_full()=true after finalise operations should be complete!";
898  printer_error().raise(LOCAL_INFO, errmsg.str());
899  }
900  if(it->second->sync_buffer_is_empty()==false)
901  {
902  std::ostringstream errmsg;
903  errmsg << "rank "<<myRank<<": Error! Buffer "<<it->second->get_label()<<" reports sync_buffer_is_empty()=false after finalise operations should be complete!";
904  printer_error().raise(LOCAL_INFO, errmsg.str());
905  }
906  }
907  else
908  {
909  if(it->second->postponed_RA_queue_length()!=0)
910  {
911  std::ostringstream errmsg;
912  errmsg << "rank "<<myRank<<": Error! Buffer "<<it->second->get_label()<<" reports postponed_RA_queue_length!=0 (length is "<<it->second->postponed_RA_queue_length()<<") after finalise operations should be complete!";
913  printer_error().raise(LOCAL_INFO, errmsg.str());
914  }
915  if(it->second->get_RA_queue_length()!=0)
916  {
917  std::ostringstream errmsg;
918  errmsg << "rank "<<myRank<<": Error! Buffer "<<it->second->get_label()<<" reports get_RA_queue_length()!=0 (length is "<<it->second->get_RA_queue_length()<<") after finalise operations should be complete!";
919  printer_error().raise(LOCAL_INFO, errmsg.str());
920  }
921  }
922  }
923 
925  // We now need to combine the data from all the different processes
926  // that were creating output during this run. First, make sure all
927  // processess are finished
928  // TODO: This requires that the scanner code did not shut down MPI
929  // already! In principle we could do this another way, e.g. by
930  // getting all processes to write a dummy file that signals that
931  // they are done, but for now let us just use this:
932  // TODO: In principle we shouldn't have to do this combination if
933  // either MPI is not used at all, or if mpiSize==1, however currently
934  // we still have to do it due to the way the RA writes are just
935  // "queued up" in the temporary HDF5 file, rather than actually
936  // written to the correct place. But should make it do the latter
937  // in these special cases.
938  if(not abnormal) // Don't do the combination in case of abnormal termination, since we cannot reliably wait for all the other processes to finish
939  {
940 #ifdef WITH_MPI
941  logger() << LogTags::printers << "We are in normal shutdown mode, meaning that the run has finished and output files should be combined. However, the master process must wait for all workers to write their output to disk before attempting the combination. We are now entering this barrier; if we are master we will wait here; all other processes will just register entry and then continue." << EOM;
942  myComm.masterWaitForAll(FINAL_SYNC);
943  // TODO! What if the master finishes before other processes? Then it will sit here. But what then if an abnormal shutdown signal is received?? Then the other processes will not enter the barrier! This is bad.
944  // To avoid the problem we'd have to make the wait able to monitor for termination signals.
945  // Also could just turn off the auto-combination and make the user "continue" the scan one final time to trigger the combination?
946 #endif
947 
948  logger() << LogTags::printers << "Passed FINAL_SYNC point in HDF5Printer finalise() routine" << EOM;
949 
950  if(myRank==0)
951  {
952  // Make sure all datasets etc are closed before doing this or else errors may occur.
953 
955  {
956  logger() << LogTags::printers << "We are the master process: beginning combination of output files." << EOM;
958  }
959  else
960  {
961  logger() << LogTags::printers << "We are the master process: but 'disable_combine_routines' is set to true. SKIPPING combination of output files." << EOM;
962  }
963  }
964  }
965  else
966  {
967  logger() << LogTags::printers << "rank "<<myRank<<": HDF5Printer is terminating abnormally (usually if some signal, e.g. CTRL-C detected); temporary hdf5 files NOT combined! Combination will be attempted upon resuming the run." << EOM;
968  }
969  logger() << LogTags::printers << "rank "<<myRank<<": HDF5Printer finalise() completed successfully." << EOM;
970  } //end if(is_primary_printer)
971  }
972 
975  void HDF5Printer::combine_output_py(const std::vector<std::string> tmp_files, const bool finalcombine)
976  {
977  std::ostringstream command;
978  std::ostringstream tmp_file_list;
979  for(auto it=tmp_files.begin(); it!=tmp_files.end(); ++it)
980  {
981  tmp_file_list << *it << " ";
982  }
983  command << "python "<< GAMBIT_DIR <<"/Printers/scripts/combine_hdf5.py --delete_tmp "<<tmp_comb_file<<" "<<group<<" "<<tmp_file_list.str()<<" 2>&1";
984  logger() << LogTags::printers << "Running HDF5 data combination script..." << std::endl;
985  logger() << "> " << command.str() << std::endl;
986  logger() << EOM;
987  FILE* fp = popen(command.str().c_str(), "r");
988  if(fp==NULL)
989  {
990  // Error running popen
991  std::ostringstream errmsg;
992  errmsg << "rank "<<myRank<<": Error running HDF5 data combination script during HDF5Printer finalise()! popen failed to run the specified command (command was '"<<command.str()<<"')";
993  printer_error().raise(LOCAL_INFO, errmsg.str());
994  }
995  // Something ran at least; get the stdout (plus redirected stderr)
996  char buffer[2048];
997  // read output into a c++ stream via buffer
998  std::ostringstream output;
999  while(fgets(buffer, sizeof(buffer), fp) != NULL) {
1000  output << buffer;
1001  }
1002  int rc = pclose(fp);
1004  logger() << "Stdout/stderr captured from HDF5printer combination script" << std::endl;
1005  logger() << "--------------------" << std::endl;
1006  logger() << output.str() << std::endl;
1007  logger() << "--------------------" << std::endl;
1008  logger() << "end HDF5 combination script output" << EOM;
1009  if(rc!=0)
1010  {
1011  // Python error occurred
1012  std::ostringstream errmsg;
1013  errmsg << "rank "<<myRank<<": Error running HDF5 data combination script during HDF5Printer finalise()! Script ran, but return code != 0 was encountered; stdout and stderr from the system call can be found in the log files.";
1014  printer_error().raise(LOCAL_INFO, errmsg.str());
1015  }
1016  // Otherwise everything should be ok!
1017  if(finalcombine)
1018  {
1019  // This happens only at the end of the run; copy data to user-requested filename
1020  // TODO! This does not permit adding different runs into the same hdf5 file
1021  // Need to make sure Greg's combine code can do this.
1022  std::ostringstream command2;
1023  command2 <<"cp "<<tmp_comb_file<<" "<<finalfile<<" && rm "<<tmp_comb_file; // Note, deletes old file if successful
1024  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command2.str() << EOM;
1025  FILE* fp = popen(command2.str().c_str(), "r");
1026  if(fp==NULL)
1027  {
1028  // Error running popen
1029  std::ostringstream errmsg;
1030  errmsg << "rank "<<myRank<<": Error copying combined HDF5 data to final location during HDF5Printer finalise()! popen failed to run the specified copy (and delete) command (command was '"<<command2.str()<<"')";
1031  printer_error().raise(LOCAL_INFO, errmsg.str());
1032  }
1033  else if(pclose(fp)!=0)
1034  {
1035  // Command returned exit code!=0, or pclose failed
1036  std::ostringstream errmsg;
1037  errmsg << "rank "<<myRank<<": Error copying combined HDF5 data to final location during HDF5Printer finalise()! Shell command failed to execute successfully, please check stderr (command was '"<<command2.str()<<"').";
1038  printer_error().raise(LOCAL_INFO, errmsg.str());
1039  }
1040  // Success!
1041  }
1042  }
1043 
1046  void HDF5Printer::combine_output(const std::vector<std::string> tmp_files, const bool finalcombine)
1047  {
1048  logger() << LogTags::printers << "Running HDF5 data combination..." << EOM;
1049  // Do combination
1050  int num = tmp_files.size(); // We don't actually use their names here, Greg's code assumes that they
1051  // follow a fixed format and they all exist. We check for this before
1052  // running this function, so this should be fine.
1053 
1054  // If we set the second last flag 'true' then Greg's code will assume that a '_temp_combined' output file
1055  // exists, and it will crash if it doesn't. So we need to first check if such a file exists.
1056  bool combined_file_exists = Utils::file_exists(tmp_comb_file); // We already check this externally; pass in as flag?
1057  std::cout<<"combined_file_exists? "<<combined_file_exists<<std::endl;
1058  // Second last bool just tells the routine to delete the temporary files when it is done
1059  // Last flag, if false, tells routines to throw an error if any expected temporary file cannot be opened for any reason
1060  HDF5::combine_hdf5_files(tmp_comb_file, finalfile, group, num, combined_file_exists, true, false);
1061 
1062  // This is just left the same as the combine_output_py version!
1063  if(finalcombine)
1064  {
1065  // This happens only at the end of the run; copy data to user-requested filename
1066  // TODO! This does not permit adding different runs into the same hdf5 file
1067  // Need to make sure Greg's combine code can do this.
1068  std::ostringstream command2;
1069  command2 <<"cp "<<tmp_comb_file<<" "<<finalfile<<" && rm "<<tmp_comb_file; // Note, deletes old file if successful
1070  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command2.str() << EOM;
1071  FILE* fp = popen(command2.str().c_str(), "r");
1072  if(fp==NULL)
1073  {
1074  // Error running popen
1075  std::ostringstream errmsg;
1076  errmsg << "rank "<<myRank<<": Error copying combined HDF5 data to final location during HDF5Printer finalise()! popen failed to run the specified copy (and delete) command (command was '"<<command2.str()<<"')";
1077  printer_error().raise(LOCAL_INFO, errmsg.str());
1078  }
1079  else if(pclose(fp)!=0)
1080  {
1081  // Command returned exit code!=0, or pclose failed
1082  std::ostringstream errmsg;
1083  errmsg << "rank "<<myRank<<": Error copying combined HDF5 data to final location during HDF5Printer finalise()! Shell command failed to execute successfully, please check stderr (command was '"<<command2.str()<<"').";
1084  printer_error().raise(LOCAL_INFO, errmsg.str());
1085  }
1086  // Success!
1087  }
1088 
1089  }
1090 
1091 
1094  {
1095  if(location_id==-1)
1096  {
1097  std::ostringstream errmsg;
1098  errmsg << "Error! Tried to retrieve 'location_id' handle from HDF5Printer, but it is -1! This means the printer has not been set up correctly. This is a bug, please report it.";
1099  printer_error().raise(LOCAL_INFO, errmsg.str());
1100  }
1101  return location_id;
1102  }
1103 
1105  {
1106  if(RA_location_id==-1)
1107  {
1108  std::ostringstream errmsg;
1109  errmsg << "Error! Tried to retrieve 'RA_location' pointer from HDF5Printer, but it is -1! This means the printer has not been set up correctly. This is a bug, please report it.";
1110  printer_error().raise(LOCAL_INFO, errmsg.str());
1111  }
1112  return RA_location_id;
1113  }
1114 
1118  {
1119  error_if_key_exists(all_my_buffers, key, "all_my_buffers");
1120  all_my_buffers[key] = &newbuffer;
1121 
1122  error_if_key_exists(all_buffers, key, "all_buffers");
1123  primary_printer->all_buffers[key] = &newbuffer;
1124  }
1125 
1128  {
1129  bool answer = true;
1130  if( primary_printer->all_buffers.find(key)
1131  == primary_printer->all_buffers.end() )
1132  {
1133  answer = false;
1134  }
1135  return answer;
1136  }
1137 
1148  {
1149  // Check if it is in the lookup map already
1150  std::map<PPIDpair, unsigned long>& lookup = primary_printer->global_index_lookup;
1151  std::vector<PPIDpair>& reverse_lookup = primary_printer->reverse_global_index_lookup;
1152  std::map<PPIDpair, unsigned long>::const_iterator it = lookup.find(ppid);
1153  if ( it != lookup.end() ) {
1154  std::ostringstream errmsg;
1155  errmsg << "Error! Supplied PPID already exists in global_index_lookup map! It should only be added once, so there is a bug in HDF5Printer. Please report this error.";
1156  printer_error().raise(LOCAL_INFO, errmsg.str());
1157  }
1158 
1159  // If the list has reached its max allowed length, flush any queued RA
1160  // writes, clear the list, and increment RA_dset_offset.
1161  if(reverse_lookup.size()==MAX_PPIDPAIRS)
1162  {
1163  for (BaseBufferMap::iterator it = primary_printer->all_buffers.begin(); it != primary_printer->all_buffers.end(); it++)
1164  {
1165  if(it->second->is_synchronised())
1166  { /*do nothing, these are the sync buffers*/ }
1167  else
1168  {
1169  it->second->RA_flush(primary_printer->global_index_lookup);
1170  }
1171  }
1172  lookup.clear();
1173  reverse_lookup.clear();
1174  primary_printer->RA_dset_offset += MAX_PPIDPAIRS;
1175  }
1176 
1177  // Ok, now safe to add new stuff
1178  lookup[ppid] = reverse_lookup.size() + primary_printer->RA_dset_offset;
1179  reverse_lookup.push_back(ppid);
1180 
1181  // Need to make sure the pointID and MPIrank are stashed at this location in the RA output
1182  // (this should only occur from within a non-synchronised printer, so these should be
1183  // interpreted as RA writes)
1184  unsigned long pointID = ppid.pointID; // unsigned versions were coming out gibberish in python...
1185  unsigned int mpirank = ppid.rank;
1186  //std::cout << "rank "<<myRank<<": adding new RA PPID to list: (" << pointID << "," << mpirank << ")" << std::endl;
1187 
1188  // The ID numbers will be obtained via the 'aux' parameter system, but I think that is fine.
1189  // The call is a little bizarre because these are template functions from the base class, which
1190  // require this weird notation to resolve a compiler abiguity.
1191  this->print(pointID, "RA_pointID", mpirank, pointID);
1192  this->print(mpirank, "RA_MPIrank", mpirank, pointID);
1193  }
1194 
1197  {
1201  }
1202 
1205  {
1206  bool result = false;
1207  std::map<PPIDpair, unsigned long>& lookup = primary_printer->global_index_lookup;
1208  if ( lookup.find(ppid) != lookup.end() ) result = true;
1209  return result;
1210  }
1211 
1213  unsigned long HDF5Printer::get_global_index(const unsigned long pointID, const unsigned int mpirank)
1214  {
1215  std::map<PPIDpair, unsigned long>::iterator it;
1216  std::map<PPIDpair, unsigned long>& lookup = primary_printer->global_index_lookup;
1217  it = lookup.find(PPIDpair(pointID,mpirank));
1218  if ( it == lookup.end() )
1219  {
1220 #ifdef DEBUG_MODE
1221  std::cout<<"Contents of global_index_lookup map:"<<std::endl;
1222  for(it = lookup.begin(); it != lookup.end(); it++) {
1223  std::cout<<"[pointID="<<it->first.first<<", mpirank="<<it->first.second<<"] : index="<<it->second<<std::endl;
1224  }
1225 #endif
1226  std::ostringstream errmsg;
1227  errmsg << "Error retrieving global index for pointID="<<pointID<<", mpirank="<<mpirank<<"; no corresponding global index found. This means this point has not yet passed through the primary printer. This function is called in preparation for writing to data files via random access, so possibly something has gone wrong there.";
1228  printer_error().raise(LOCAL_INFO, errmsg.str());
1229  }
1230  return it->second;
1231  }
1232 
1234  // Will move the "write heads" of all buffers to "current_dset_position"
1235  // This should only required one "skip_append" command at most to each buffer; something
1236  // went wrong if more are required.
1238  {
1239  if(is_auxilliary_printer())
1240  {
1241  std::ostringstream errmsg;
1242  errmsg << "Error! synchronise_buffer() called by auxilliary hdf5 printer (name="<<printer_name<<")! Only the primary hdf5 printer is allowed to do this. This is a bug in the HDF5Printer class, please report it.";
1243  printer_error().raise(LOCAL_INFO, errmsg.str());
1244  }
1245 
1246  // Determine the desired sync position
1247  const unsigned long sync_pos = get_sync_pos()-1;
1248 
1249  // Cycle through all buffers and tell them to ensure they are at the right position
1250  // The buffers should throw an error if we are accidentally telling them to go backwards
1251  // or skip too many points or anything they can't do.
1252  // Here though we should only be moving them forward by one position.
1253 #ifdef DEBUG_MODE
1254  std::cout<<"rank "<<myRank<<": Synchronising buffers to position "<<sync_pos<<" (message from printer: "<<printer_name<<", is_auxilliary: "<<is_auxilliary_printer()<<", synchronised: "<<synchronised<<")"<<std::endl;
1255 #endif
1256 
1257  // Do the sync buffers first:
1258  for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1259  {
1260  if(it->second->is_synchronised()) it->second->synchronise_output_to_position(sync_pos);
1261  }
1262 
1263  // Now the RA buffers; they don't have to be the same size as the sync buffers, but it
1264  // is useful if we keep them all the same length. Need two loops; one to find the current
1265  // max length of an RA buffer, and the next to tell them all to adjust themselves to
1266  // that length (which won't actually happen until the next time they need to write their
1267  // data to disk).
1268  // TODO: may not need this, can possible just set them to match "get_N_RApointIDs()".
1269  // These are all the possible RA write position anyway...
1270  unsigned long maxRAlength = get_N_RApointIDs();
1271  //for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1272  //{
1273  // if(not it->second->is_synchronised())
1274  // {
1275  // unsigned long length = it->second->get_dataset_length();
1276  // if(length>maxRAlength) maxRAlength = length;
1277  // }
1278  //}
1279  for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1280  {
1281  if(not it->second->is_synchronised()) it->second->synchronise_output_to_position(maxRAlength);
1282  }
1283  }
1284 
1285  // (this will trigger MPI sends if needed)
1286  // Note that if one sync buffer is full, they should all be full!
1287  // By default this only empties buffers if they are full. Use
1288  // flag to force the flush for the finalise buffer dumps.
1290  {
1291 #ifdef DEBUG_MODE
1292  std::cout<<"rank "<<myRank<<": Emptying sync buffers (if full)..."<<std::endl;
1293 #endif
1294  unsigned int N_sync_buffers = 0;
1295  unsigned int N_were_full = 0;
1296  for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1297  {
1298  if(it->second->is_synchronised())
1299  {
1300  N_sync_buffers += 1;
1301  if(it->second->sync_buffer_is_full() or force)
1302  {
1303 #ifdef DEBUG_MODE
1304  std::cout<<"rank "<<myRank<<": Emptying sync buffer "<<it->second->get_label()<<std::endl;
1305 #endif
1306  N_were_full += 1; // Can get flushed if not full only if force=true
1307  it->second->flush();
1308  }
1309  }
1310  }
1311 
1312  // Make sure that all the buffers were full and have now been cleared!
1313  if(N_were_full != 0 and N_were_full != N_sync_buffers)
1314  {
1315  std::ostringstream errmsg;
1316  errmsg << "Error! Tried to empty all synchronised buffers, but some of them were not full! (N_were_full ("<<N_were_full<<") != N_sync_buffers ("<<N_sync_buffers<<"). This indicates that a loss of synchronisation has occurred, which is a bug in the hdf5printer system. Please report it. (Note: rank="<<myRank<<", printer_name="<<printer_name<<")";
1317  printer_error().raise(LOCAL_INFO, errmsg.str());
1318  }
1319 
1320  // Tell the HDF5 library to flush everything to disk
1321  herr_t err = H5Fflush(file_id, H5F_SCOPE_GLOBAL);
1322  if(err<0)
1323  {
1324  std::ostringstream errmsg;
1325  errmsg << "Error in HDF5Printer while trying to empty all synchronised buffers. Buffers were emptied to the HDF5 backend (seemingly) successfully, however H5Fflush returned an error value ("<<err<<"). That is, an error occurred while the HDF5 system attempted to flush its internally buffered data to disk. (Note: rank="<<myRank<<", printer_name="<<printer_name<<")";
1326  printer_error().raise(LOCAL_INFO, errmsg.str());
1327  }
1328  }
1329 
1334  {
1335  empty_sync_buffers(true); // NOTE: forces flush even if buffers not full
1336 
1337  // Need to do all the sync buffers before the RA buffers, so that at the end of the
1338  // run the sync buffers all get written to disk first, and no RA writes get left
1339  // in the postpone queue due to missing targets.
1340  for (BaseBufferMap::iterator it = all_buffers.begin(); it != all_buffers.end(); it++)
1341  {
1342  if(it->second->is_synchronised())
1343  { // do nothing, these are the sync buffers
1344  //if(not it->second->sync_buffer_is_empty()) {}
1345  }
1346  else
1347  {
1348  it->second->RA_flush(primary_printer->global_index_lookup);
1349  //std::cout<<"rank "<<myRank<<": flushing RA buffer of "<<it->second->get_label()<<std::endl;
1350  }
1351  }
1352 
1353  // Make sure RA datasets are the same length
1354 
1355 
1356  }
1357 
1362  void HDF5Printer::reset(bool force)
1363  {
1364  //#ifdef DEBUG_MODE
1365  std::cout<<"resetting..."<<std::endl;
1366  std::cout<<"is_auxilliary_printer() = "<<is_auxilliary_printer()<<std::endl;
1367  std::cout<<"synchronised = "<<synchronised<<std::endl;
1368  std::cout<<"printer_name = "<<printer_name<<std::endl;
1369  //#endif
1370  if(not force and not this->is_auxilliary_printer())
1371  {
1372  std::ostringstream errmsg;
1373  errmsg << "Error! Tried to call reset() on the primary HDF5Printer (printer_name = "<<printer_name<<")! This would delete all the data from the scan and is not currently allowed! Probably this was called accidentally due to a bug.";
1374  printer_error().raise(LOCAL_INFO, errmsg.str());
1375  }
1376  else if(not force and synchronised)
1377  {
1378  std::ostringstream errmsg;
1379  errmsg << "Error! Tried to call reset() on an auxilliary HDF5Printer (printer_name = "<<printer_name<<") which is synchronised with the primary printer! This would delete all the point-level data written by this printer during the scan and is not currently allowed! Probably this was called accidentally due to a bug.";
1380  printer_error().raise(LOCAL_INFO, errmsg.str());
1381  }
1382  else
1383  {
1384  // Ok safe to do the resets.
1385  for (BaseBufferMap::iterator it = all_my_buffers.begin(); it != all_my_buffers.end(); it++)
1386  {
1387  // SPECIAL CASES!
1388  // We rely on the special RA buffers 'RA_MPIrank' and 'RA_pointID'
1389  // to keep track of which RA entries correspond to which
1390  // sync buffer points. Once written once, we preserve which slots
1391  // in the RA datasets correspond to a particular sync point, even
1392  // after resetting them, because multiple RA streams are all
1393  // writing data into the same hdf5 group. So if we invalidate the
1394  // RA_MPIrank and RA_pointID ranks, then we screw up the matching
1395  // between RA output streams (i.e. non-sync printers).
1396  // Could perhaps use a different groups for each aux printer
1397  // (i.e. stream). each with its own MPIrank and pointID datasets.
1398  // For now though, we will simply not invalidate these datasets.
1399  const std::string& name = it->second->get_label();
1400  if(name!="RA_MPIrank" and name!="RA_pointID")
1401  {
1402  // Reset the rest
1403  it->second->reset(force);
1404  }
1405  }
1406  // Also need to reset the PPID lists
1407  // TODO: The HDF5Printer currently assumes that ALL the auxilliary printers are
1408  // reset together. This is not really what we want, but to deal with it I would
1409  // need to make e.g. a separate "RA" group in the hdf5 output for every aux
1410  // stream, and then get the combine script to combine them all.
1411  reset_PPID_lists();
1412  }
1413  }
1414 
1421  void HDF5Printer::check_sync(const std::string& label, const int sync_type, bool checkall=false)
1422  {
1423  BaseBufferMap* map_to_check=NULL;
1424  if(checkall)
1425  {
1426  if(not is_primary_printer)
1427  {
1428  std::ostringstream errmsg;
1429  errmsg << "Error running 'check_sync'; flag 'checkall' set to true, but this is not the primary printer (it is "<<this->get_printer_name()<<") This is not allowed and is a bug, please fix it.";
1430  printer_error().raise(LOCAL_INFO, errmsg.str());
1431  }
1432  else
1433  {
1434  map_to_check=&all_buffers;
1435  }
1436  }
1437  else
1438  {
1439  map_to_check=&all_my_buffers;
1440  }
1441 
1442  // Explicitly check up on the synchronisation of all the buffers and their
1443  // associated datasets
1444  std::string sync_type_name = "non-perfect";
1445  long int diff; // required difference (dset_head_pos - sync_pos)
1446  if (sync_type==0) {
1447  sync_type_name = "pre-resync (non-perfect)";
1448  }
1449  else if(sync_type==1) {
1450  diff = 1;
1451  sync_type_name = "post-resync (perfect)";
1452  }
1453  else if(sync_type==2) {
1454  diff = 0;
1455  sync_type_name = "post-newpoint (perfect)";
1456  }
1457 
1458  std::cout<<"rank "<<myRank<<": Sync check (type: "<<sync_type_name<<"); "<<label<<std::endl;
1459 
1460 #define ERR_MSG \
1461 std::ostringstream errmsg; \
1462 errmsg << "rank "<<myRank<<": Error! ("<<label<<"; ("<<sync_type_name<<") sync check) Buffers have gone out of sync in printer '"<<printer_name<<"'!"<<std::endl; \
1463 errmsg << " head_pos = " << head_pos << "; name = " << name << std::endl; \
1464 errmsg << " sync_pos = " << sync_pos_plus1-1 << std::endl;
1465 
1466  for(BaseBufferMap::iterator it = map_to_check->begin(); it != map_to_check->end(); it++)
1467  {
1468  if(it->second->is_synchronised())
1469  {
1470  long head_pos = it->second->dset_head_pos();
1471  std::string name = it->second->get_label();
1472  long sync_pos_plus1 = get_sync_pos();
1473 
1474  if(sync_type==0) {
1475  if(head_pos+1 < sync_pos_plus1)
1476  {
1477  ERR_MSG
1478  errmsg << " (head_pos < syncpos) " << std::endl;
1479  printer_error().raise(LOCAL_INFO, errmsg.str());
1480  } else if (head_pos > sync_pos_plus1)
1481  {
1482  ERR_MSG
1483  errmsg << " (head_pos > syncpos + 1) " << std::endl;
1484  printer_error().raise(LOCAL_INFO, errmsg.str());
1485  } // else ok.
1486  }
1487  else if(sync_type==1 or sync_type==2)
1488  {
1489  if(head_pos != sync_pos_plus1-1 + diff)
1490  {
1491  ERR_MSG
1492  errmsg << " (head_pos != syncpos + "<<diff<<") " << std::endl;
1493  printer_error().raise(LOCAL_INFO, errmsg.str());
1494  }
1495  }
1496 
1497  std::cout << " head_pos = " << head_pos << "; name = " << name << std::endl;
1498  std::cout << " sync_pos = " << sync_pos_plus1-1 << std::endl;
1499  //it->second->sync_report();
1500  }
1501  }
1502  }
1503 
1504 
1506  // and perform adjustments needed to prepare the printer.
1507  void HDF5Printer::check_for_new_point(const PPIDpair& candidate_newpoint)
1508  {
1509  if(is_auxilliary_printer())
1510  {
1511  // Redirect task to primary printer
1512  primary_printer->check_for_new_point(candidate_newpoint);
1513  }
1514 
1515  // Check if we have changed target PointIDs since the last print call
1516  if(candidate_newpoint!=lastPointID)
1517  {
1518 
1519 #ifdef MPI_DEBUG
1520  std::cout<<"rank "<<myRank<<": New point detected (lastPointID="<<lastPointID<<", candidate_newpoint="<<candidate_newpoint<<")"<<std::endl;
1521  std::cout<<"rank "<<myRank<<": sync_pos="<<get_sync_pos()<<std::endl;
1522 #endif
1523 
1524  // Explicitly check up on the synchronisation of all the buffers and their
1525  // associated datasets
1526 
1527  // When a new point is detected, the "sync position" should be the
1528  // largest index which has a pointID assigned, which will match the
1529  // head_pos of any buffers which didn't write to that point.
1530  // The other buffers are allowed to be AT MOST one slot ahead of this,
1531  // having done a write for the last point and moved ahead one slot.
1532  //
1533  // At the end of this routine, all the buffers should once again
1534  // have their write heads pointing to both the same buffer index, and
1535  // the same output dataset index, and this index should be equal to
1536  // the number of pointIDs collected so far, minus 1 for the zero indexing
1537  // (with the last ID being the point from which the next print statements
1538  // will arrive).
1539  //
1540  // So, in the end, with sync_pos = reverse_global_index_lookup.size() - 1 = get_N_pointIDs() - 1
1541  // we require
1542  // dset_head_pos() == sync_pos
1543  // for every buffer.
1544  // However, at the beginning, we may have
1545  // dset_head_pos() == sync_pos + 1
1546  // for some buffers, which, after sync_pos is incremented with the new
1547  // pointID, will match the new sync_pos, and the other buffers will need
1548  // to be caught up. Any other state is an error.
1549  // So, in debug mode, we will check this first.
1550 
1551 #ifdef CHECK_SYNC
1552  check_sync("Prelim check (in check_for_new_point)", 0);
1553 #endif
1554 
1555  // Make sure all the buffers are caught up at the old position.
1557 
1558 #ifdef CHECK_SYNC
1559  check_sync("Post-resync check (in check_for_new_point)", 1);
1560 #endif
1561 
1562  // Yep the scanner has moved on, at least as far as the current process sees
1563  lastPointID = candidate_newpoint;
1564 
1565  // Check if the buffers are full and waiting to be emptied
1566  // (this will trigger MPI sends if needed)
1568 
1569 #ifdef CHECK_SYNC
1570  check_sync("Post-buffer-empty check (in check_for_new_point)", 1);
1571 #endif
1572 
1573  // Move forward buffer synchronisation counter by one slot
1575 
1576  // Now the buffers should all be synchronised; check this in debug mode.
1577 #ifdef CHECK_SYNC
1578  check_sync("Post-newpoint check (in check_for_new_point)", 2);
1579 #endif
1580 
1581  // Debugging only! check if buffers are somehow still full...
1582 #ifdef MPI_DEBUG
1583  for (BaseBufferMap::iterator it = all_my_buffers.begin(); it != all_my_buffers.end(); it++)
1584  {
1585  if(it->second->is_synchronised())
1586  {
1587  if(it->second->sync_buffer_is_full())
1588  {
1589  std::ostringstream errmsg;
1590  errmsg << "rank "<<myRank<<": Error! sync buffer with label "<<it->second->get_label()<<" is full! Should not be full at this point (printer="<<printer_name<<")";
1591  printer_error().raise(LOCAL_INFO, errmsg.str());
1592  }
1593  }
1594  }
1595 #endif
1596 
1597  // For resuming, we need to be able to retrieve the pointID and
1598  // MPIrank for every output point. So we need to make sure this
1599  // information is always output, and not rely on the scanner to
1600  // do it. Scanners can output it as well, since not all printers
1601  // may do it automatically (although it would be good if they
1602  // did), but in that case they should use the special ID codes
1603  // below to avoid duplication.
1604  // EDIT: Ok need to do these in the constructor also, since otherwise
1605  // the very first point gets missed.
1608  //_print(candidate_newpoint, "pointID", -1000, myRank, candidate_newpoint);
1609  //_print(myRank, "MPIrank", -1001, myRank, candidate_newpoint);
1610  }
1611  else
1612  {
1613  // no action required
1614  }
1615  }
1616 
1618 
1619  } // end namespace printers
1620 } // end namespace Gambit
1621 
1622 #undef DBUG
1623 #undef DEBUG_MODE
void print(MixMatrix A)
Helper function to print a matrix.
unsigned long get_sync_pos() const
Get the number of pointIDs know to this printer (should correspond to the number of "appends" each ac...
Define overloadings of the stream operator for various containers.
bool seen_PPID_before(const PPIDpair &ppid)
Ask the printer for the highest ID number known for a given rank process (needed for resuming...
BaseBufferMap all_my_buffers
Map containing pointers to all VertexBuffers contained in this printer.
unsigned long long int pointID
unsigned int myRank
MPI rank and size.
EXPORT_SYMBOLS error & printer_error()
Printer errors.
void common_constructor(const Options &)
Tasks common to the various constructors.
hid_t get_location() const
Retrieve pointer to HDF5 location to which datasets are added.
std::vector< PPIDpair > reverse_global_index_lookup
void reset(bool force=false)
Invalidate all data on disk which has been printed by this printer so far, and reset all the buffers ...
unsigned long RA_dset_offset
Offset needed to be added to the reverse lookup in order for it to match the output dataset position ...
#define LOCAL_INFO
Definition: local_info.hpp:34
void combine_output_py(const std::vector< std::string > tmp_files, const bool finalcombine)
Combine temporary hdf5 output files from each process into a single coherent hdf5 file...
hid_t openFile(const std::string &fname, bool overwrite, bool &oldfile, const char access_type='r')
File and group manipulation.
Definition: hdf5tools.cpp:182
PPIDpair lastPointID
ID of the point that this printer is currently working on.
Logging access header for GAMBIT.
EXPORT_SYMBOLS unsigned long long int & get_point_id()
Returns unigue pointid;.
Definitions of new MPI datatypes needed by printers.
void combine_output(const std::vector< std::string > tmp_files, const bool finalcombine)
Combine temporary hdf5 output files from each process into a single coherent hdf5 file This version o...
vertexID / sub-print index pair Identifies individual buffers (I call them VertexBuffer, but actually there can be more than one per vertex)
void reset_PPID_lists()
Completely reset the PPIDlists (e.g. used when printer is reset)
Greg&#39;s code for combining HDF5 output of multiple MPI processes.
std::map< unsigned long, unsigned long long int > get_highest_PPID_from_HDF5(hid_t group_id)
GAMBIT signal handling functions.
void add_PPID_to_list(const PPIDpair &)
Add PPIDpair to global index list.
EXPORT_SYMBOLS bool check_if_shutdown_begun()
Check for signals that early shutdown is required If an MPI message telling us to perform an emergenc...
void synchronise_buffers()
Function to ensure buffers are all synchronised to the same absolute position.
bool hasKey(const args &... keys) const
Getters for key/value pairs (which is all the options node should contain)
General small utility functions.
TYPE getValue(const args &... keys) const
HDF5_BACKEND_TYPES bool is_stream_managed(VBIDpair &key) const
HDF5Printer-specific functions.
void flush()
Empty all the buffers to disk Note: Empty sync buffers will not get flushed, to avoid writing extra b...
HDF5 interface printer class declaration.
void finalise(bool abnormal=false)
Perform final cleanup and write tasks.
std::string get_printer_name()
Get the name of this printer.
EXPORT_SYMBOLS bool file_exists(const std::string &filename)
Check if a file exists.
bool synchronised
Flag to specify whether all buffers created by this printer should be synchronised and iterated along...
void empty_sync_buffers(bool force=false)
Check if the buffers are full and waiting to be emptied By default this only empties buffers if they ...
EXPORT_SYMBOLS SignalData & signaldata()
Retrieve global instance of signal handler options struct.
void initialise(const std::vector< int > &)
Virtual function overloads:
const Logging::endofmessage EOM
Explicit const instance of the end of message struct in Gambit namespace.
Definition: logger.hpp:100
#define DBUG(x)
HDF5Printer * primary_printer
Pointer to the primary printer object.
void check_sync(const std::string &label, const int sync_type, bool checkall)
For debugging: check that buffers are synced correctly Flag sets whether "perfect" sync is required...
#define ERR_MSG
EXPORT_SYMBOLS Logging::LogMaster & logger()
Function to retrieve a reference to the Gambit global log object.
Definition: logger.cpp:95
hid_t closeFile(hid_t file)
Close hdf5 file.
Definition: hdf5tools.cpp:92
void combine_hdf5_files(const std::string output_file, const std::string &base_file_name, const std::string &group, const size_t num, const bool resume, const bool cleanup, const bool skip, const std::vector< std::string > input_files=std::vector< std::string >())
START_MODEL dNur_CMB r
EXPORT_SYMBOLS const str & ensure_path_exists(const str &)
Ensure that a path exists (and then return the path, for chaining purposes)
void error_if_key_exists(const std::map< T, U > &m, const T &key, const std::string &tag)
Helper function to check if a VertexBuffer key already exists in a map.
Definition: hdf5printer.hpp:74
bool checkFileReadable(const std::string &fname, std::string &msg)
Check if hdf5 file exists and can be opened in read/write mode.
Definition: hdf5tools.cpp:268
TYPE getValueOrDef(TYPE def, const args &... keys) const
unsigned long get_global_index(const ulong pointID, const uint mpirank)
Retrieve index from global lookup table, with error checking.
Tools for accessing printers.
A simple C++ wrapper for the MPI C bindings.
unsigned long sync_pos
Position to start writing new output.
void prepare_and_combine_tmp_files()
Scan for existing temporary files, in preparation for combining them Should only do this if scan is r...
VertexBuffer abstract interface base class.
Exception objects required for standalone compilation.
hid_t openGroup(hid_t file_id, const std::string &name, bool nocreate=false)
Definition: hdf5tools.cpp:512
void increment_sync_pos()
Move head dataset sync position.
std::map< PPIDpair, unsigned long > global_index_lookup
Map from pointID,thread pairs to absolute dataset indices.
HDF5Printer(const Options &, BasePrinter *const primary=NULL)
Constructor (for construction via inifile options)
std::map< VBIDpair, VertexBufferBase * > BaseBufferMap
Helpful typedefs.
Definition: hdf5printer.hpp:70
Derived dataset interface, with methods for writing scalar records (i.e.
unsigned long get_N_RApointIDs()
Get the number of RA write locations known to the primary printer NOTE: the meaning of this has chang...
std::string printer_name
Label for printer, mostly for more helpful error messages.
bool checkGroupReadable(hid_t location, const std::string &groupname, std::string &msg)
Check if a group exists and can be accessed.
Definition: hdf5tools.cpp:302
bool is_primary_printer
Flag to specify if this is the primary printer or not.
void setValue(const KEYTYPE &key, const VALTYPE &val)
Basic setter, for adding extra options.
EXPORT_SYMBOLS const PPIDpair nullpoint
Define &#39;nullpoint&#39; const.
void insert_buffer(VBIDpair &key, VertexBufferBase &newbuffer)
Add a pointer to a new buffer to the global list.
A collection of tools for interacting with HDF5 databases.
bool disable_combine_routines
Flag to disable combination of hdf5 output (user will have to run the combination routines manually) ...
void check_for_new_point(const PPIDpair &)
Check whether printing to a new parameter space point is about to occur and perform adjustments neede...
pointID / process number pair Used to identify a single parameter space point
TODO: see if we can use this one:
Definition: Analysis.hpp:33
A small wrapper object for &#39;options&#39; nodes.
std::pair< std::vector< std::string >, std::vector< size_t > > find_temporary_files(const std::string &finalfile)
Search for temporary files to be combined.
void check_for_error_messages()
std::vector< std::string > find_temporary_files(const bool error_if_inconsistent=false)
Search the output directory for temporary files (pre-combination)
std::vector< T > get_chunk(std::size_t i, std::size_t length) const
READ methods (perhaps can generalise to non-scalar case, but this doesn&#39;t exist yet for writing anywa...
BaseBufferMap all_buffers
Things which other printers need access to.