gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-252-gf9a3f78
a Global And Modular Bsm Inference Tool
hdf5printer_v2.cpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
6 //
16 //
17 #include <math.h>
18 #include <limits>
19 #include <iterator>
23 
24 // Helper to check next item in iteration
25 //template <typename Iter>
26 //Iter next(Iter iter)
27 //{
28 // return ++iter;
29 //}
30 
31 namespace Gambit
32 {
33  namespace Printers
34  {
36 
38  HDF5DataSetBase::HDF5DataSetBase(const std::string& name, const hid_t tid)
39  : _myname(name)
40  , is_open(false)
41  , virtual_dset_length(0)
42  , exists_on_disk(false)
43  , dset_id(-1)
44  , hdftype_id(tid)
45  , type_id(HDF5::inttype_from_h5type(tid))
46  {}
47 
49  HDF5DataSetBase::HDF5DataSetBase(const std::string& name)
50  : _myname(name)
51  , is_open(false)
53  , exists_on_disk(false)
54  , dset_id(-1)
55  , hdftype_id(-1)
56  , type_id(-1)
57  {}
58 
59 
62  {
63  if(is_open)
64  {
65  if(dset_id<0)
66  {
67  std::stringstream errmsg;
68  errmsg<<"Error closing dataset in destructor for HDF5DataSetBase! Dataset is marked 'open' but dset_id<0! This is a bug, please report it.";
69  printer_error().raise(LOCAL_INFO, errmsg.str());
70  }
71  close_dataset();
72  }
73  }
74 
76  std::string HDF5DataSetBase::myname() const { return _myname; }
77 
81 
84  {
86  return dset_id;
87  }
88 
90  int HDF5DataSetBase::get_type_id() const { return type_id; }
91 
94 
97  {
98  bool exists(false);
99  htri_t r = H5Lexists(loc_id, myname().c_str(), H5P_DEFAULT);
100  if(r>0) exists=true;
101  else if(r==0) exists=false;
102  else
103  {
104  // Something else went wrong; error
105  std::ostringstream errmsg;
106  errmsg<<"HDF5 error encountered while checking if dataset named '"<<myname()<<"' exists!";
107  printer_error().raise(LOCAL_INFO, errmsg.str());
108  }
109  if(exists) set_exists_on_disk();
110  return exists;
111  }
112 
114  void HDF5DataSetBase::ensure_dataset_exists(const hid_t loc_id, const std::size_t length)
115  {
116  if(not dataset_exists(loc_id))
117  {
118  // Dataset doesn't exist; create it
119  create_dataset(loc_id);
120  }
121 
122  // Make sure length is correct (add fill values as needed);
123  open_dataset(loc_id);
124  if(get_dset_length()<length)
125  {
126  extend_dset_to(length);
127  }
128  close_dataset();
129  }
130 
133  {
135  return dims[0];
136  //return virtual_dset_length;
137  }
138 
140  //void HDF5DataSetBase::set_dset_length(const std::size_t newsize)
141  //{
142  // virtual_dset_length = newsize;
143  //}
144 
146  void HDF5DataSetBase::extend_dset_to(const std::size_t newlength)
147  {
149  std::size_t current_length = dims[0];
150  if(newlength<current_length)
151  {
152  std::ostringstream errmsg;
153  errmsg << "Failed to extend dataset (with name=\""<<myname()<<"\") from length "<<current_length<<" to length "<<newlength<<"! The new length is short than the existing length! This is a bug, please report it.";
154  printer_error().raise(LOCAL_INFO, errmsg.str());
155  }
156 
157  dims[0] = newlength;
158  herr_t status = H5Dset_extent(get_dset_id(), dims);
159  if(status<0)
160  {
161  std::ostringstream errmsg;
162  errmsg << "Failed to extend dataset (with name=\""<<myname()<<"\") from length "<<current_length<<" to length "<<newlength<<"!";
163  printer_error().raise(LOCAL_INFO, errmsg.str());
164  }
165 
166  // Record the new dataset length
167  //set_dset_length(newlength);
168  }
169 
171  void HDF5DataSetBase::extend_dset_by(const std::size_t extend_by)
172  {
174  std::size_t current_length = get_dset_length();
175  std::size_t newlength = current_length + extend_by;
176  extend_dset_to(newlength);
177  }
178 
181  {
182  if(not is_open)
183  {
184  std::ostringstream errmsg;
185  errmsg << "Error! Dataset (with name=\""<<myname()<<"\") is not open! Code following this check is not permitted to run. This is a bug in HDF5Printer2, please report it.";
186  printer_error().raise(LOCAL_INFO, errmsg.str());
187  }
188  }
189 
192  {
193  if(is_open)
194  {
195  std::ostringstream errmsg;
196  errmsg << "Error opening dataset (with name=\""<<myname()<<"\") in HDF5 file. The dataset is already open! This is a bug, please report it.";
197  printer_error().raise(LOCAL_INFO, errmsg.str());
198  }
199 
200  // Open the dataset
201  dset_id = H5Dopen2(location_id, myname().c_str(), H5P_DEFAULT);
202  if(dset_id<0)
203  {
204  std::ostringstream errmsg;
205  errmsg << "Error opening existing dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Dopen2 failed." << std::endl
206  << "You may have a corrupt hdf5 file from a previous run. Try using -r, or deleting the old output.";
207  printer_error().raise(LOCAL_INFO, errmsg.str());
208  }
209 
210  // Get dataspace of the dataset.
211  hid_t dspace_id = H5Dget_space(dset_id);
212  if(dspace_id<0)
213  {
214  std::ostringstream errmsg;
215  errmsg << "Error opening existing dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Dget_space failed.";
216  printer_error().raise(LOCAL_INFO, errmsg.str());
217  }
218 
219  // Get the number of dimensions in the dataspace.
220  int rank = H5Sget_simple_extent_ndims(dspace_id);
221  if(rank<0)
222  {
223  std::ostringstream errmsg;
224  errmsg << "Error opening existing dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Sget_simple_extent_ndims failed.";
225  printer_error().raise(LOCAL_INFO, errmsg.str());
226  }
227  if(rank!=DSETRANK)
228  {
229  std::ostringstream errmsg;
230  errmsg << "Error while accessing existing dataset (with name=\""<<myname()<<"\") in HDF5 file. Rank of dataset ("<<rank<<") does not match the expected rank ("<<DSETRANK<<").";
231  printer_error().raise(LOCAL_INFO, errmsg.str());
232  }
233 
234  // Get the dimension size of each dimension in the dataspace
235  // now that we know ndims matches DSETRANK.
236  hsize_t dims_out[DSETRANK];
237  int ndims = H5Sget_simple_extent_dims(dspace_id, dims_out, NULL);
238  if(ndims<0)
239  {
240  std::ostringstream errmsg;
241  errmsg << "Error while accessing existing dataset (with name=\""<<myname()<<"\") in HDF5 file. Failed to retrieve dataset extents (H5Sget_simple_extent_dims failed).";
242  printer_error().raise(LOCAL_INFO, errmsg.str());
243  }
244 
245  // Update parameters to match dataset contents
246  // Compute initial dataspace and chunk dimensions
247  dims[0] = dims_out[0]; // Set to match existing data
248  maxdims[0] = H5S_UNLIMITED; // No upper limit on number of records allowed in dataset
249  chunkdims[0] = HDF5_CHUNKLENGTH;
250  //slicedims[0] = 1; // Dimensions of a single record in the data space
251 
252  // Release dataspace handle
253  H5Sclose(dspace_id);
254 
255  // Register that we have opened the dataset
256  is_open = true;
257  }
258 
261  {
262  if(is_open)
263  {
264  if(dset_id>=0)
265  {
266  herr_t status = H5Dclose(dset_id);
267  if(status<0)
268  {
269  std::ostringstream errmsg;
270  errmsg << "Error closing dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Dclose failed.";
271  printer_error().raise(LOCAL_INFO, errmsg.str());
272  }
273  }
274  else
275  {
276  std::ostringstream errmsg;
277  errmsg << "Error closing dataset (with name=\""<<myname()<<"\") in HDF5 file. Dataset ID is negative. This would usually indicate that the dataset is not open, however the 'is_open' flag is 'true'. This is a bug, please report it.";
278  printer_error().raise(LOCAL_INFO, errmsg.str());
279  }
280  }
281  else
282  {
283  std::ostringstream errmsg;
284  errmsg << "Error closing dataset (with name=\""<<myname()<<"\") in HDF5 file. The dataset is not open! This is a bug, please report it.";
285  printer_error().raise(LOCAL_INFO, errmsg.str());
286  }
287  is_open = false;
288  }
289 
291  std::pair<hid_t,hid_t> HDF5DataSetBase::select_hyperslab(std::size_t offset, std::size_t length) const
292  {
293  // Make sure that this chunk lies within the dataset extents
294  if(offset + length > get_dset_length())
295  {
296  std::ostringstream errmsg;
297  errmsg << "Error selecting chunk from dataset (with name=\""<<myname()<<") in HDF5 file. Tried to select a hyperslab which extends beyond the dataset extents:" << std::endl;
298  errmsg << " offset = " << offset << std::endl;
299  errmsg << " offset+length = " << offset+length << std::endl;
300  errmsg << " dset_length = "<< get_dset_length() << std::endl;
301  printer_error().raise(LOCAL_INFO, errmsg.str());
302  }
303 
304  // Select a hyperslab.
305  hid_t dspace_id = H5Dget_space(get_dset_id());
306  if(dspace_id<0)
307  {
308  std::ostringstream errmsg;
309  errmsg << "Error selecting chunk from dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Dget_space failed." << std::endl;
310  printer_error().raise(LOCAL_INFO, errmsg.str());
311  }
312 
313  hsize_t offsets[DSETRANK];
314  offsets[0] = offset;
315 
316  hsize_t selection_dims[DSETRANK]; // Set same as output chunks, but may have a different length
317  selection_dims[0] = length; // Adjust chunk length to input specification
318 
319  herr_t err_hs = H5Sselect_hyperslab(dspace_id, H5S_SELECT_SET, offsets, NULL, selection_dims, NULL);
320 
321  if(err_hs<0)
322  {
323  std::ostringstream errmsg;
324  errmsg << "Error selecting chunk from dataset (with name=\""<<myname()<<"\", offset="<<offset<<", length="<<selection_dims[0]<<") in HDF5 file. H5Sselect_hyperslab failed." << std::endl;
325  printer_error().raise(LOCAL_INFO, errmsg.str());
326  }
327 
328  // Define memory space
329  //H5::DataSpace memspace( DSETRANK, this->get_chunkdims() );
330  hid_t memspace_id = H5Screate_simple(DSETRANK, selection_dims, NULL);
331 
332  #ifdef HDF5_DEBUG
333  std::cout << "Debug variables:" << std::endl
334  << " dset_length = " << get_dset_length() << std::endl
335  << " offsets[0] = " << offsets[0] << std::endl
336  << " selection_dims[0] = " << selection_dims[0] << std::endl;
337  #endif
338 
339  return std::make_pair(memspace_id, dspace_id); // Be sure to close these identifiers after using them!
340  }
341 
343 
345 
346  HDF5DataSetBasic::HDF5DataSetBasic(const std::string& name)
347  : HDF5DataSetBase(name)
348  {}
349 
351  {
352  std::ostringstream errmsg;
353  errmsg<<"Tried to use create_dataset function in a HDF5DataSetBasic object! This is not allowed. This object is only able to perform basic operations on existing datasets, like resizings them. For full dataset access a HDF5DataSet<T> object is required, where the type T of the dataset must be known.";
354  printer_error().raise(LOCAL_INFO, errmsg.str());
355  }
356 
358 
360 
362  HDF5BufferBase::HDF5BufferBase(const std::string& name, const bool sync)
363  : _dset_name(name)
364  , synchronised(sync)
365  {}
366 
368  std::string HDF5BufferBase::dset_name() const
369  {
370  return _dset_name;
371  }
372 
375  {
376  return synchronised;
377  }
378 
380  std::set<PPIDpair> HDF5BufferBase::get_points_set() const
381  {
382  return buffer_set;
383  }
384 
386 
387 
389 
390  HDF5MasterBuffer::HDF5MasterBuffer(const std::string& filename, const std::string& groupname, const bool sync, const std::size_t buflen
391 #ifdef WITH_MPI
392  , GMPI::Comm& comm
393 #endif
394  )
395  : synchronised(sync)
396  , buffer_length(sync ? buflen : MAX_BUFFER_SIZE) // Use buflen for the bufferlength if this is a sync buffer, otherwise use MAX_BUFFER_SIZE
397  , file(filename)
398  , group(groupname)
399  , file_id(-1)
400  , group_id(-1)
401  , location_id(-1)
402  , hdf5out(file,false)
403  , file_open(false)
404  , have_lock(false)
405 #ifdef WITH_MPI
406  , hdf5_buffers_int(sync,comm)
407  , hdf5_buffers_uint(sync,comm)
408  , hdf5_buffers_long(sync,comm)
409  , hdf5_buffers_ulong(sync,comm)
410  //, hdf5_buffers_longlong(sync,comm)
411  //, hdf5_buffers_ulonglong(sync,comm)
412  , hdf5_buffers_float(sync,comm)
413  , hdf5_buffers_double(sync,comm)
414  , myComm(comm)
415 #else
416  , hdf5_buffers_int(sync)
417  , hdf5_buffers_uint(sync)
418  , hdf5_buffers_long(sync)
419  , hdf5_buffers_ulong(sync)
420  //, hdf5_buffers_longlong(sync)
421  //, hdf5_buffers_ulonglong(sync)
422  , hdf5_buffers_float(sync)
423  , hdf5_buffers_double(sync)
424 #endif
425  {
426  //std::cout<<"Constructed MasterBuffer to attach to file/group:"<<std::endl;
427  //std::cout<<"file : "<<file<<std::endl;
428  //std::cout<<"group: "<<group<<std::endl;
429  }
430 
432  {
434  }
435 
437  {
438  return synchronised;
439  }
440 
443  {
444  return buffer_length;
445  }
446 
448  std::string HDF5MasterBuffer::get_file() { return file; }
449 
452  //std::cout<<"Accessing variable 'group'"<<std::endl;
453  //std::cout<<"group = "<<group<<std::endl;
454  return group;
455  }
456 
459  {
460  return buffered_points.size();
461  }
462 
466  {
467  if(get_Npoints()>0) // No point trying to flush an already empty buffer
468  {
469  // Obtain lock on the output file
471 
472  // Determine next available output slot (i.e. current nominal dataset length)
473  std::size_t target_pos = get_next_free_position();
474 
475  // Behaviour is different depending on whether this buffer manager
476  // manages synchronised or RA buffers
477  if(is_synchronised())
478  {
479  // DEBUG
480  #ifdef HDF5PRINTER2_DEBUG
482  logger()<<"Preparing to flush "<<buffered_points.size()<<" points to target position "<<target_pos<<std::endl;
483  std::size_t i=0;
484  for(auto it=buffered_points.begin(); it!=buffered_points.end(); ++it, ++i)
485  {
486  logger()<<" buffered_point "<<i<<": "<<(*it)<<std::endl;
487  }
488  logger()<<EOM;
489  #endif
490  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
491  {
492  // Extend the output datasets to the next free position (in case some have been left behind)
493  it->second->ensure_dataset_exists(location_id, target_pos);
494 
495  // For synchronised writes we have to tell the buffers what order to write their output,
496  // and where. 'target_pos' should usually be the end of their dataset, unless data for
497  // a certain dataset was not written for some buffer dump.
498  it->second->block_flush(location_id,buffered_points,target_pos);
499  }
500  buffered_points.clear();
501  buffered_points_set.clear();
502  }
503  else
504  {
505  //std::cout<<"Preparing to flush "<<buffered_points.size()<<" random-access points to datasets with length "<<target_pos<<std::endl;
506 
507  // DEBUG inspect buffered points
508  //std::size_t i=0;
509  //for(auto it=buffered_points.begin(); it!=buffered_points.end(); ++it, ++i)
510  //{
511  // std::cout<<"buffered_points["<<i<<"] = "<<(*it)<<std::endl;
512  //}
513 
514  // For RA writes we need to first locate the positions on disk of all
515  // the points to be written
516  // We will do this in (large) chunks.
517  bool done = false;
518  auto it = buffered_points.begin();
519  std::set<PPIDpair> done_points;
520  while(not done)
521  {
522  std::vector<PPIDpair> sub_buffer;
523  sub_buffer.clear();
524  for(std::size_t j=0; (j<1000000) && (it!=buffered_points.end()); ++j, ++it)
525  {
526  sub_buffer.push_back(*it);
527  }
528  if(it==buffered_points.end()) done=true;
529  //std::cout<<"Getting dataset positions for "<<sub_buffer.size()<<" points"<<std::endl;
530  //std::cout<<"("<<buffered_points.size()-sub_buffer.size()<<" points remaining)"<<std::endl;
531 
532  // Obtain locations on disk of all points in sub_buffer
533  const std::map<PPIDpair,std::size_t> position_map(get_position_map(sub_buffer));
534 
535  // Record which points were found (the ones that were not found need to be left in the buffer)
536  for(auto jt = position_map.begin(); jt!=position_map.end(); ++jt)
537  {
538  //std::cout<<"position_map["<<jt->first<<"] = "<<jt->second<<std::endl;
539  if(jt->second > target_pos)
540  {
541  std::ostringstream errmsg;
542  errmsg<<"A target position for a RA write is beyond the current nominal dataset size! This doesn't make sense because we should not have retrieved such positions. This is a bug, please report it.";
543  printer_error().raise(LOCAL_INFO, errmsg.str());
544  }
545  done_points.insert(jt->first);
546  }
547 
548  // Tell all buffers to write their points to disk according to the position map
549  for(auto jt = all_buffers.begin(); jt!=all_buffers.end(); ++jt)
550  {
551  // Extend datasets to current nominal size and flush RA data
552  jt->second->ensure_dataset_exists(location_id, target_pos);
553  jt->second->random_flush(location_id, position_map);
554  }
555  }
556 
557 
558  // Remove flushed points from the buffered points record
559  std::vector<PPIDpair> remaining_buffered_points;
560  for(auto it=buffered_points.begin(); it!=buffered_points.end(); ++it)
561  {
562  if(done_points.count(*it)==0)
563  {
564  // This point couldn't be flushed (wasn't found on disk (yet))
565  //DEBUG
566  //std::cout<<"Could not flush point "<<(*it)<<", leaving it in the buffer."<<std::endl;
567  remaining_buffered_points.push_back(*it);
568  }
569  }
570  buffered_points = remaining_buffered_points;
573  if(buffered_points.size() != buffered_points_set.size())
574  {
575  std::stringstream msg;
576  msg<<"Inconsistency detected between buffered_points and buffered_points_set sizes after subtracting done_points ("<<buffered_points.size()<<" vs "<<buffered_points_set.size()<<")! This is a bug, please report it."<<std::endl;
577  printer_error().raise(LOCAL_INFO,msg.str());
578  }
579  //std::cout<<buffered_points.size()<<" points failed to flush from random-access buffer."<<std::endl;
580  }
581 
582  // Release lock on output file
584  }
585  }
586 
588  std::vector<std::pair<std::string,int>> HDF5MasterBuffer::get_all_dset_names_on_disk()
589  {
591 
592  // Get all object names in the group
593  std::vector<std::string> names = HDF5::lsGroup(group_id);
594  std::vector<std::pair<std::string,int>> dset_names_and_types;
595 
596  // Determine which objects are datasets
597  for(auto it = names.begin(); it!=names.end(); ++it)
598  {
599  if(HDF5::isDataSet(group_id, *it) and not Utils::endsWith(*it,"_isvalid"))
600  {
601  // Figure out the type
602  hid_t h5type = HDF5::getH5DatasetType(group_id, *it);
603  int type = HDF5::inttype_from_h5type(h5type);
604  dset_names_and_types.push_back(std::make_pair(*it,type));
605  }
606  }
607 
609 
610  return dset_names_and_types;
611  }
612 
613  #ifdef WITH_MPI
614  void HDF5MasterBuffer::MPI_flush_to_rank(const unsigned int r)
616  {
617  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
618  {
619  it->second->MPI_flush_to_rank(r);
620  }
621  buffered_points.clear();
622  buffered_points_set.clear();
623  }
624 
626  // Don't do this for all processes at once, as MPI can run out of
627  // Recv request IDs behind the scenes if thousands of processes are
628  // trying to send it lots of stuff at once. But it is good to let
629  // many processes start sending data before we need it, so that the
630  // Recv goes quickly (and is effectively already done in the background)
631  // when we get to it.
632  void HDF5MasterBuffer::MPI_request_buffer_data(const unsigned int r)
633  {
634  logger()<< LogTags::printers << LogTags::info << "Asking process "<<r<<" to begin sending buffer data"<<EOM;
635  // Send a message to the process to trigger it to begin sending buffer data (if any exists)
636  int begin_sending = 1;
637  myComm.Send(&begin_sending, 1, r, h5v2_BEGIN);
638  }
639 
641  void HDF5MasterBuffer::MPI_recv_all_buffers(const unsigned int r)
642  {
643  // MAKE SURE MPI_request_buffer_data(r) HAS BEEN CALLED BEFORE THIS FUNCTION!
644  // Otherwise a deadlock will occur because we will wait forever for buffer data
645  // that will never be sent.
646 
647  int more_buffers = 1;
648  int max_Npoints = 0; // Track largest buffer sent, for reporting general number of points recv'd
649  int Nbuffers = 0; // Number of buffers recv'd
650  while(more_buffers)
651  {
652  // Check "more buffers" message
653  myComm.Recv(&more_buffers, 1, r, h5v2_BLOCK);
654  //recv_counter+=1;
655  logger()<<LogTags::printers<<LogTags::debug<<"More buffers to receive from process "<<r<<"? "<<more_buffers<<EOM;
656  if(more_buffers)
657  {
658  Nbuffers+=1;
659  // Retrieve the name of the dataset for the buffer data
660  MPI_Status status;
661  myComm.Probe(r, h5v2_bufname, &status);
662  int name_size;
663  int err = MPI_Get_count(&status, MPI_CHAR, &name_size);
664  if(err<0)
665  {
666  std::ostringstream errmsg;
667  errmsg<<"MPI_Get_count failed while trying to get name of dataset to receive!";
668  printer_error().raise(LOCAL_INFO, errmsg.str());
669  }
670  std::string dset_name(name_size, 'x'); // Initialise string to correct size, but filled with x's
671  myComm.Recv(&dset_name[0], name_size, MPI_CHAR, r, h5v2_bufname);
672 
673  logger()<<LogTags::printers<<LogTags::debug<<"Preparing to receive buffer data from process "<<r<<" for buffer "<<dset_name<<EOM;
674 
675  // Get datatype of buffer data, and call matching receive function for that type
676  int buftype;
677  myComm.Recv(&buftype, 1, r, h5v2_bufdata_type);
678  int Npoints = 0;
679  switch(buftype)
680  {
681  case h5v2_type<int >(): Npoints = MPI_recv_buffer<int >(r, dset_name); break;
682  case h5v2_type<uint >(): Npoints = MPI_recv_buffer<uint >(r, dset_name); break;
683  case h5v2_type<long >(): Npoints = MPI_recv_buffer<long >(r, dset_name); break;
684  case h5v2_type<ulong >(): Npoints = MPI_recv_buffer<ulong >(r, dset_name); break;
685  //case h5v2_type<longlong >(): Npoints = MPI_recv_buffer<longlong >(r, dset_name); break;
686  //case h5v2_type<ulonglong>(): Npoints = MPI_recv_buffer<ulonglong>(r, dset_name); break;
687  case h5v2_type<float >(): Npoints = MPI_recv_buffer<float >(r, dset_name); break;
688  case h5v2_type<double >(): Npoints = MPI_recv_buffer<double >(r, dset_name); break;
689  default:
690  std::ostringstream errmsg;
691  errmsg<<"Unrecognised datatype integer (value = "<<buftype<<") received in buffer type message from rank "<<r<<" for dataset "<<dset_name<<"!";
692  printer_error().raise(LOCAL_INFO, errmsg.str());
693  }
694  if(Npoints>max_Npoints) max_Npoints = Npoints;
695  }
696  }
697 
699  if(max_Npoints==0)
700  {
701  logger()<<"No print buffer data recv'd from rank "<<r<<" process"<<std::endl;
702  }
703  else
704  {
705  logger()<<"Recv'd "<<Nbuffers<<" dataset buffers from rank "<<r<<"; the largest one contained "<<max_Npoints<<" points"<<std::endl;
706  }
707  // Debug
708  //for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
709  //{
710  // std::cout<<"New buffer length is "<<it->second->N_items_in_buffer()<<" (name="<<it->second->dset_name()<<")"<<std::endl;
711  //}
712 
713  logger()<<"Finished checking for buffer data messages from process "<<r<<EOM;
714  }
715 
716  // Add a vector of buffer chunk data to the buffers managed by this object
717  void HDF5MasterBuffer::add_to_buffers(const std::vector<HDF5bufferchunk>& blocks, const std::vector<std::pair<std::string,int>>& buf_types)
718  {
719  for(auto it=blocks.begin(); it!=blocks.end(); ++it)
720  {
721  const HDF5bufferchunk& block(*it);
723  {
724  std::ostringstream errmsg;
725  errmsg<<"Invalid block detected! used_size exceeds max SIZE ("<<block.used_size<<">"<<HDF5bufferchunk::SIZE<<"). This is a bug, please report it.";
726  printer_error().raise(LOCAL_INFO, errmsg.str());
727  }
729  {
730  std::ostringstream errmsg;
731  errmsg<<"Invalid block detected! used_nbuffers exceeds max NBUFFERS ("<<block.used_nbuffers<<">"<<HDF5bufferchunk::NBUFFERS<<"). This is a bug, please report it.";
732  printer_error().raise(LOCAL_INFO, errmsg.str());
733  }
734 
735  for(std::size_t j=0; j<block.used_nbuffers; j++)
736  {
737  // Work out dataset name and whether we want the int or float values for this buffer
738  std::string name = buf_types.at(block.name_id[j]).first;
739  int type = buf_types.at(block.name_id[j]).second;
740  #ifdef HDF5PRINTER2_DEBUG
742  logger()<<"Adding block["<<j<<"] with type ID "<<type<<" and name ID "<<block.name_id[j]<<" to dataset named "<<name<<EOM;
743  #endif
744  switch(type)
745  {
746  case h5v2_type<int >(): MPI_add_int_block_to_buffer<int >(block, name, j); break;
747  case h5v2_type<uint >(): MPI_add_int_block_to_buffer<uint >(block, name, j); break;
748  case h5v2_type<long >(): MPI_add_int_block_to_buffer<long >(block, name, j); break;
749  case h5v2_type<ulong >(): MPI_add_int_block_to_buffer<ulong >(block, name, j); break;
750  //case h5v2_type<longlong >(): MPI_add_int_block_to_buffer<longlong >(block, name, j); break;
751  //case h5v2_type<ulonglong>(): MPI_add_int_block_to_buffer<ulonglong>(block, name, j); break;
752  case h5v2_type<float >(): MPI_add_float_block_to_buffer<float >(block, name, j); break;
753  case h5v2_type<double >(): MPI_add_float_block_to_buffer<double >(block, name, j); break;
754  default:
755  std::ostringstream errmsg;
756  errmsg<<"Unrecognised datatype integer (value = "<<type<<") received in a buffer block for dataset "<<name<<"!";
757  printer_error().raise(LOCAL_INFO, errmsg.str());
758  }
759  }
760  }
761  }
762 
763  #endif
764 
767  {
768  if(not (file_open and have_lock))
769  {
770  std::ostringstream errmsg;
771  errmsg << "Error! Output HDF5 file '"<<file<<"' is not open and locked! Code following this check is not permitted to run. This is a bug in HDF5Printer2, please report it.";
772  printer_error().raise(LOCAL_INFO, errmsg.str());
773  }
774  }
775 
777  void HDF5MasterBuffer::lock_and_open_file(const char access_type)
778  {
779  if(have_lock)
780  {
781  std::stringstream err;
782  err<<"HDF5MasterBuffer attempted to obtain a lock on the output hdf5 file, but it already has the lock! This is a bug, please report it."<<std::endl;
783  printer_error().raise(LOCAL_INFO, err.str());
784  }
785 
786  if(file_open)
787  {
788  std::stringstream err;
789  err<<"HDF5Printer2 attempted to open the output hdf5 file, but it is already open! This is a bug, please report it."<<std::endl;
790  printer_error().raise(LOCAL_INFO, err.str());
791  }
792 
793  // Obtain the lock
794  hdf5out.get_lock();
795 
796  // Open the file and target groups
797  file_id = HDF5::openFile(file,false,access_type);
799 
800  // Set the target dataset write location to the chosen group
802 
803  file_open=true;
804  have_lock=true;
805  }
806 
809  {
810  if(not have_lock)
811  {
812  std::stringstream err;
813  err<<"HDF5Printer2 attempted to release a lock on the output hdf5 file, but it doesn't currently have the lock! This is a bug, please report it."<<std::endl;
814  printer_error().raise(LOCAL_INFO, err.str());
815  }
816 
817  if(not file_open)
818  {
819  std::stringstream err;
820  err<<"HDF5Printer2 attempted to close the output hdf5 file, but it is not currently open! This is a bug, please report it."<<std::endl;
821  printer_error().raise(LOCAL_INFO, err.str());
822  }
823 
824  if(group_id<0)
825  {
826  // This especially shouldn't happen because the 'file_open' flag should not be 'true' if the group has been closed.
827  std::stringstream err;
828  err<<"HDF5Printer2 attempted to close the output hdf5 file, but group_id<0 indicating that that output group is already closed (or was never opened)! This is a bug, please report it."<<std::endl;
829  printer_error().raise(LOCAL_INFO, err.str());
830  }
831 
832  if(file_id<0)
833  {
834  // This especially shouldn't happen because the 'file_open' flag should not be 'true' if the group has been closed.
835  std::stringstream err;
836  err<<"HDF5Printer2 attempted to close the output hdf5 file, but file_id<0 indicating that that output file is already closed (or was never opened)! This is a bug, please report it."<<std::endl;
837  printer_error().raise(LOCAL_INFO, err.str());
838  }
839 
840  // Close groups and file
843 
844  // Release the lock
845  hdf5out.release_lock();
846 
847  file_open=false;
848  have_lock=false;
849  }
850 
853  {
855  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
856  {
857  it->second->reset(location_id);
858  }
860  buffered_points.clear();
861  buffered_points_set.clear();
862  }
863 
865  void HDF5MasterBuffer::update_buffer_map(const std::string& label, HDF5BufferBase& buff)
866  {
867  auto it=all_buffers.find(label);
868  if(it==all_buffers.end())
869  {
871  all_buffers.emplace(label,&buff);
872  }
873  else if(&buff!=it->second) // if candidate buffer not the same as the one already in the map
874  {
875  if(buff.get_type_id() != it->second->get_type_id())
876  {
877  // Make sure that we haven't accidentally duplicated a buffer due to type confusion!
878  std::stringstream err;
879  err<<"Tried to add a buffer with label "<<label<<" to a MasterBuffer, but a non-identical buffer with the same name and different type already exists! This is a bug, please report it. Types were (existing:"<<it->second->get_type_id()<<", new:"<<buff.get_type_id()<<")";
880  printer_error().raise(LOCAL_INFO, err.str());
881  }
882  else
883  {
884  // Make sure that we haven't accidentally duplicated a buffer some other bizarre way
885  std::stringstream err;
886  err<<"Tried to add a buffer with label "<<label<<" to a MasterBuffer, but a non-identical buffer with the same name and same type already exists (type="<<buff.get_type_id()<<")! This shouldn't be possible and is a bug, please report it.";
887  printer_error().raise(LOCAL_INFO, err.str());
888  }
889  }
890  }
891 
896  {
897  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
898  {
899  it->second->update(ppid);
900  }
901  }
902 
905  {
906  return all_buffers.size();
907  }
908 
910  // Used to trigger dump to disk if buffer is using too much RAM
911  // All datasets treated as doubles for simplicity
913  {
914  double Nbytes = (8.+4.) * (double)get_Nbuffers() * (double)get_Npoints();
915  return pow(2,-20) * Nbytes; // bytes -> MB (base 2)
916  }
917 
918 
921  {
923 
924  HDF5DataSet<int> mpiranks ("MPIrank"); // Will need some constructor arguments
925  HDF5DataSet<int> mpiranks_valid("MPIrank_isvalid"); // Will need some constructor arguments
926  HDF5DataSet<ulong> pointids ("pointID");
927  HDF5DataSet<int> pointids_valid("pointID_isvalid");
928 
929  // Open all datasets that we need
930  mpiranks .open_dataset(location_id);
931  mpiranks_valid.open_dataset(location_id);
932  pointids .open_dataset(location_id);
933  pointids_valid.open_dataset(location_id);
934 
935  std::size_t r_size = mpiranks .get_dset_length();
936  std::size_t rv_size = mpiranks_valid.get_dset_length();
937  std::size_t p_size = pointids .get_dset_length();
938  std::size_t pv_size = pointids_valid.get_dset_length();
939 
940  //std::cout<<"mpiranks dataset length: "<<r_size<<std::endl;
941 
942  if( (r_size!=rv_size)
943  || (r_size!=p_size)
944  || (r_size!=pv_size) )
945  {
946  std::ostringstream errmsg;
947  errmsg<<"Inconsistency in dataset sizes detected! This is a bug, please report it.";
948  printer_error().raise(LOCAL_INFO, errmsg.str());
949  }
950 
951  // Close datasets
952  mpiranks .close_dataset();
953  mpiranks_valid.close_dataset();
954  pointids .close_dataset();
955  pointids_valid.close_dataset();
956 
957  return r_size;
958  }
959 
961  std::map<PPIDpair,std::size_t> HDF5MasterBuffer::get_position_map(const std::vector<PPIDpair>& buffer) const
962  {
964 
965  std::map<PPIDpair,std::size_t> position_map_out;
966 
967  HDF5DataSet<int> mpiranks ("MPIrank"); // Will need some constructor arguments
968  HDF5DataSet<int> mpiranks_valid("MPIrank_isvalid"); // Will need some constructor arguments
969  HDF5DataSet<ulong> pointids ("pointID");
970  HDF5DataSet<int> pointids_valid("pointID_isvalid");
971 
972  // Copy point buffer into a set for faster lookup
973  const std::set<PPIDpair> buffer_set(buffer.begin(), buffer.end());
974 
975  // DEBUG check that copy was correct
976  //for(auto it=buffer_set.begin(); it!=buffer_set.end(); ++it) std::cout<<" buffer_set item: "<<(*it)<<std::endl;
977 
978  //std::cout<<"Obtaining position map for random access write"<<std::endl;
979 
980  // Open all datasets that we need
981  mpiranks .open_dataset(location_id);
982  mpiranks_valid.open_dataset(location_id);
983  pointids .open_dataset(location_id);
984  pointids_valid.open_dataset(location_id);
985 
986  std::size_t dset_length = mpiranks.get_dset_length();
987  std::size_t Nchunks = dset_length / HDF5_CHUNKLENGTH;
988  std::size_t remainder = dset_length % HDF5_CHUNKLENGTH;
989  if(remainder!=0) Nchunks+=1;
990  for(std::size_t i=0; i<Nchunks; i++)
991  {
992  std::size_t offset = i * HDF5_CHUNKLENGTH;
993  std::size_t length = HDF5_CHUNKLENGTH;
994  if(remainder!=0 and (i+1)==Nchunks) length = remainder;
995  //std::cout<<"Reading chunk "<<offset<<" to "<<offset+length<<"(chunk "<<i<<" of "<<Nchunks<<")"<<std::endl;
996 
997  std::vector<int> r_chunk = mpiranks .get_chunk(offset,length);
998  std::vector<int> rv_chunk = mpiranks_valid.get_chunk(offset,length);
999  std::vector<ulong> p_chunk = pointids .get_chunk(offset,length);
1000  std::vector<int> pv_chunk = pointids_valid.get_chunk(offset,length);
1001 
1002  std::size_t position = offset;
1003 
1004  auto rt =r_chunk .begin();
1005  auto rvt=rv_chunk.begin();
1006  auto pt =p_chunk .begin();
1007  auto pvt=pv_chunk.begin();
1008  for(; // Variables declared above due to different types
1009  (rt !=r_chunk.end() )
1010  && (rvt!=rv_chunk.end())
1011  && (pt !=p_chunk.end() )
1012  && (pvt!=pv_chunk.end());
1013  ++rt,++rvt,++pt,++pvt,++position)
1014  {
1015  //DEBUG Dump everything as we read it
1016  //std::cout<<"position: "<<position<<", point: ("<<(*rt)<<", "<<(*pt)<<"), valid: ("<<(*rvt)<<", "<<(*pvt)<<")"<<std::endl;
1017 
1018  // Check if point is valid
1019  if((*rvt) and (*pvt))
1020  {
1021  // Check if this is one of the points we are looking for
1022  PPIDpair candidate(*pt,*rt);
1023  if(buffer_set.count(candidate)>0)
1024  {
1025  //std::cout<<" Point "<<candidate<<" is a match at position "<<position<<std::endl;
1026  position_map_out[candidate] = position;
1027  }
1028  }
1029  else if(not ((*rvt) and (*pvt)))
1030  {
1031  // Point is not valid. Skip it.
1032  continue;
1033  }
1034  else
1035  {
1036  // Inconsistent validity flags.
1037  std::ostringstream errmsg;
1038  errmsg<<"Inconsistent validity flags detected whilst determining dataset locations for RA buffer data! This is a bug, please report it.";
1039  printer_error().raise(LOCAL_INFO, errmsg.str());
1040  }
1041  }
1042  }
1043 
1044  // Close all datasets
1045  mpiranks .close_dataset();
1046  mpiranks_valid.close_dataset();
1047  pointids .close_dataset();
1048  pointids_valid.close_dataset();
1049 
1050  // DEBUG check result
1051  //for(auto it=position_map_out.begin(); it!=position_map_out.end(); ++it)
1052  //{
1053  // std::cout<<"Point "<<it->first<<" found at index "<<it->second<<std::endl;
1054  //}
1055 
1056  return position_map_out;
1057  }
1058 
1059  // Read through the output dataset and find the highest pointIDs for each rank (up to maxrank)
1060  // (assumes file is closed)
1061  std::map<ulong, ulong> HDF5MasterBuffer::get_highest_PPIDs(const int mpisize)
1062  {
1063  lock_and_open_file('r');
1064 
1065  std::map<ulong, ulong> highests;
1066  for(int i=0; i<mpisize; ++i)
1067  {
1068  highests[i] = 0;
1069  }
1070 
1071  HDF5DataSet<int> mpiranks ("MPIrank");
1072  HDF5DataSet<int> mpiranks_valid("MPIrank_isvalid");
1073  HDF5DataSet<ulong> pointids ("pointID");
1074  HDF5DataSet<int> pointids_valid("pointID_isvalid");
1075 
1076  // Open all datasets that we need
1077  mpiranks .open_dataset(location_id);
1078  mpiranks_valid.open_dataset(location_id);
1079  pointids .open_dataset(location_id);
1080  pointids_valid.open_dataset(location_id);
1081 
1082  std::size_t dset_length = mpiranks.get_dset_length();
1083  std::size_t Nchunks = dset_length / HDF5_CHUNKLENGTH;
1084  std::size_t remainder = dset_length % HDF5_CHUNKLENGTH;
1085  if(remainder!=0) Nchunks+=1;
1086  for(std::size_t i=0; i<Nchunks; i++)
1087  {
1088  std::size_t offset = i * HDF5_CHUNKLENGTH;
1089  std::size_t length = HDF5_CHUNKLENGTH;
1090  if(remainder!=0 and (i+1)==Nchunks) length = remainder;
1091 
1092  std::vector<int> r_chunk = mpiranks .get_chunk(offset,length);
1093  std::vector<int> rv_chunk = mpiranks_valid.get_chunk(offset,length);
1094  std::vector<ulong> p_chunk = pointids .get_chunk(offset,length);
1095  std::vector<int> pv_chunk = pointids_valid.get_chunk(offset,length);
1096 
1097  std::size_t position = offset;
1098 
1099  auto rt =r_chunk .begin();
1100  auto rvt=rv_chunk.begin();
1101  auto pt =p_chunk .begin();
1102  auto pvt=pv_chunk.begin();
1103  for(; // Variables declared above due to different types
1104  (rt!=r_chunk.end())
1105  && (rvt!=rv_chunk.end())
1106  && (pt!=p_chunk.end())
1107  && (pvt!=pv_chunk.end());
1108  ++rt,++rvt,++pt,++pvt,++position)
1109  {
1110  // Check if point is valid
1111  if((*rvt) and (*pvt))
1112  {
1113  // Yep, valid, check if it has a higher pointID for this rank than previously seen
1114  if( ((*rt)<mpisize) and ((*pt)>highests.at(*rt)) )
1115  {
1116  highests.at(*rt) = *pt;
1117  }
1118  }
1119  else if(not ((*rvt) and (*pvt)))
1120  {
1121  // Point is not valid. Skip it.
1122  continue;
1123  }
1124  else
1125  {
1126  // Inconsistent validity flags.
1127  std::ostringstream errmsg;
1128  errmsg<<"Inconsistent validity flags detected whilst finding highest pointIDs in existing output! This is a bug, please report it.";
1129  printer_error().raise(LOCAL_INFO, errmsg.str());
1130  }
1131  }
1132  }
1133 
1134  // Close datasets
1135  mpiranks .close_dataset();
1136  mpiranks_valid.close_dataset();
1137  pointids .close_dataset();
1138  pointids_valid.close_dataset();
1139 
1141 
1142  return highests;
1143  }
1144 
1147  {
1148  bool all_empty=true;
1149  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
1150  {
1151  if(it->second->N_items_in_buffer()!=0) all_empty=false;
1152  }
1153  return all_empty;
1154  }
1155 
1158  {
1159  std::stringstream ss;
1160  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
1161  {
1162  if(it->second->N_items_in_buffer()!=0)
1163  {
1164  ss << " Buffer "<<it->first<<" contains "<<it->second->N_items_in_buffer()<<" unwritten items (synchronised="<<it->second->is_synchronised()<<")"<<std::endl;
1165  // VERBOSE DEBUG OUTPUT
1166  ss << " Unwritten points are:" << std::endl;
1167  auto point_set = it->second->get_points_set();
1168  for(auto jt = point_set.begin(); jt!=point_set.end(); ++jt)
1169  {
1170  ss << " rank="<<jt->rank<<", pointID="<<jt->pointID<<std::endl;
1171  }
1172  }
1173  }
1174  return ss.str();
1175  }
1176 
1179  {
1181  return location_id;
1182  }
1183 
1185  void HDF5MasterBuffer::extend_all_datasets_to(const std::size_t length)
1186  {
1188  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
1189  {
1190  it->second->ensure_dataset_exists(location_id, length);
1191  }
1193  }
1194 
1196  const std::map<std::string,HDF5BufferBase*>& HDF5MasterBuffer::get_all_buffers() { return all_buffers; }
1197 
1199  const std::set<PPIDpair>& HDF5MasterBuffer::get_all_points() { return buffered_points_set; }
1200 
1206  {
1207  logger()<<LogTags::printers<<LogTags::info<<"Resynchronising print buffers:" << std::endl;
1208 
1209  // Determine all known points in all buffers
1210  std::set<PPIDpair> initial_buffer_points = buffered_points_set;
1211  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
1212  {
1213  logger()<<" Buffer contains "<<it->second->N_items_in_buffer()<<" items (name="<<it->second->dset_name()<<")"<<std::endl;
1214  std::set<PPIDpair> this_buffer_points = it->second->get_points_set();
1215  buffered_points_set.insert(this_buffer_points.begin(), this_buffer_points.end());
1216  }
1217 
1218  // Update all buffers with these points
1219  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
1220  {
1221  for(auto jt=buffered_points_set.begin(); jt!=buffered_points_set.end(); ++jt)
1222  {
1223  it->second->update(*jt);
1224  }
1225  }
1226 
1227  // Determine which points were not already in the buffers
1228  std::set<PPIDpair> new_points;
1229  std::set_difference(buffered_points_set.begin(), buffered_points_set.end(),
1230  initial_buffer_points.begin(), initial_buffer_points.end(),
1231  std::inserter(new_points, new_points.end()));
1232 
1233  // Debug info:
1234  logger() << std::endl
1235  << " Initial N points : "<<initial_buffer_points.size()<<std::endl
1236  << " Final N points : "<<buffered_points_set.size()<<std::endl
1237  << " Difference N points: "<<new_points.size()<<std::endl;
1238 
1239  // Update the buffer variables with the new points
1240  for(auto jt=new_points.begin(); jt!=new_points.end(); ++jt)
1241  {
1242  // Update all buffers with these points
1243  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
1244  {
1245  it->second->update(*jt);
1246  }
1247  // Update MasterBuffer variables
1248  buffered_points.push_back(*jt);
1249  }
1250 
1251  logger() << std::endl
1252  << "Print buffer now contains "<<get_Npoints()<<" items."
1253  << EOM;
1254  }
1255 
1257  // (only intended to be used when points have been removed from buffers by e.g. MPI-related
1258  // routines like flush_to_vector)
1259  void HDF5MasterBuffer::untrack_points(const std::set<PPIDpair>& removed_points)
1260  {
1261  for(auto pt=removed_points.begin(); pt!=removed_points.end(); ++pt)
1262  {
1263  auto it = buffered_points_set.find(*pt);
1264  if(it!=buffered_points_set.end())
1265  {
1266  buffered_points_set.erase(it);
1267  auto jt = std::find(buffered_points.begin(),buffered_points.end(),*pt);
1268  if(jt!=buffered_points.end())
1269  {
1270  buffered_points.erase(jt);
1271  }
1272  else
1273  {
1274  std::ostringstream errmsg;
1275  errmsg<<"Error untracking point (rank="<<pt->rank<<", pointID="<<pt->pointID<<")! Point was found in tracked set, but not in ordering vector! This is a bug, please report it.";
1276  printer_error().raise(LOCAL_INFO, errmsg.str());
1277  }
1278  }
1279  else
1280  {
1281  std::ostringstream errmsg;
1282  errmsg<<"Could not untrack point (rank="<<pt->rank<<", pointID="<<pt->pointID<<")! Point was not being tracked! This is a bug, please report it.";
1283  printer_error().raise(LOCAL_INFO, errmsg.str());
1284  }
1285  }
1286  }
1287 
1289  #define DEFINE_GET_BUFFER(TYPE)\
1290  template<>\
1291  HDF5Buffer<TYPE>& HDF5MasterBuffer::get_buffer<TYPE>(const std::string& label, const std::vector<PPIDpair>& buffered_points)\
1292  {\
1293  HDF5Buffer<TYPE>& out_buffer = CAT(hdf5_buffers_,TYPE).get_buffer(label,buffered_points);\
1294  /*logger()<<"Updating buffer map with buffer "<<label<<", C++ type="<<typeid(TYPE).name()<<", type ID="<<out_buffer.get_type_id()<<EOM;*/\
1295  update_buffer_map(label,out_buffer);\
1296  return out_buffer;\
1297  }
1298  DEFINE_GET_BUFFER(int )
1300  DEFINE_GET_BUFFER(long )
1302  //DEFINE_GET_BUFFER(longlong ) // Some type ambiguities here between C++ and HDF5, seems like ulong and ulonglong map to same HDF5 type. So ditch ulonglong for now.
1303  //DEFINE_GET_BUFFER(ulonglong)
1304  DEFINE_GET_BUFFER(float )
1305  DEFINE_GET_BUFFER(double )
1306  #undef DEFINE_GET_BUFFER
1307 
1309 
1311 
1313  HDF5Printer2* HDF5Printer2::link_primary_printer(BasePrinter* const primary)
1314  {
1315  HDF5Printer2* out(NULL);
1316  if(this->is_auxilliary_printer())
1317  {
1318  if(primary==NULL)
1319  {
1320  std::ostringstream errmsg;
1321  errmsg<<"Auxilliary printer was not constructed with a pointer to the primary printer! This is a bug, please report it";
1322  printer_error().raise(LOCAL_INFO, errmsg.str());
1323  }
1324  out = dynamic_cast<HDF5Printer2*>(primary);
1325  }
1326  return out;
1327  }
1328 
1330  HDF5Printer2::HDF5Printer2(const Options& options, BasePrinter* const primary)
1331  : BasePrinter(primary,options.getValueOrDef<bool>(false,"auxilliary"))
1332  , primary_printer(link_primary_printer(primary))
1333  , myRank(0)
1334  , mpiSize(1)
1335 #ifdef WITH_MPI
1336  , myComm() // initially attaches to MPI_COMM_WORLD
1337 #endif
1338  , buffermaster(get_filename(options),get_groupname(options),get_sync(options),get_buffer_length(options)
1339 #ifdef WITH_MPI
1340  , myComm
1341 #endif
1342  )
1343  {
1344 #ifdef WITH_MPI
1345  myRank = myComm.Get_rank();
1346  mpiSize = myComm.Get_size();
1347  this->setRank(myRank); // Tell BasePrinter what rank this process is (for error messages)
1349 #endif
1350  // Set resume flag to match primary printer, and give primary printer a pointer to our buffer master object.
1351  if(this->is_auxilliary_printer())
1352  {
1353  set_resume(get_HDF5_primary_printer()->get_resume());
1355  #ifdef WITH_MPI
1356  myComm = get_HDF5_primary_printer()->get_Comm();
1357  #endif
1358  }
1359  else
1360  {
1361 #ifdef WITH_MPI
1362  // Create a new MPI context for use by this printer
1363  myComm.dup(MPI_COMM_WORLD,"HDF5Printer2Comm"); // duplicates MPI_COMM_WORLD
1364 #endif
1365  // This is the primary printer. Need to determine resume status
1366  set_resume(options.getValue<bool>("resume"));
1367 
1368  // Overwrite output file if one already exists with the same name?
1369  bool overwrite_file = options.getValueOrDef<bool>(false,"delete_file_on_restart");
1370 
1371  // Attempt repairs on existing HDF5 output if inconsistencies detected
1372  bool attempt_repair = !options.getValueOrDef<bool>(false,"disable_autorepair");
1373 
1374  std::vector<ulong> highests(mpiSize);
1375 
1376  std::string file = get_filename();
1377  std::string group = get_groupname();
1378 
1379  if(myRank==0)
1380  {
1381  // Warn about excessive buffer length times mpiSize
1382  if(get_buffer_length() * mpiSize > 1e7)
1383  {
1384  std::ostringstream warn;
1385  warn<<"Warning from HDF5Printer2! Your chosen printer buffer length ("<<get_buffer_length()<<"), multiplied by the number of processes in your job ("<<mpiSize<<"), is very large. During job shutdown the master process receives all remaining print buffer data via MPI, from ALL processes, for efficient writing to disk. This needs to be able to fit into the available RAM. If you are sure that you have enough RAM for this then there is no problem. But please do check, and if needed, reduce the buffer length for this job using the 'buffer_length' option for the printer.";
1386  printer_warning().raise(LOCAL_INFO, warn.str());
1387  }
1388 
1389  // Check whether a readable output file exists with the name that we want to use.
1390  std::string msg_finalfile;
1391  if(HDF5::checkFileReadable(file, msg_finalfile))
1392  {
1393  if(not get_resume())
1394  {
1395  // Note: "not resume" means "start or restart"
1396  if(overwrite_file)
1397  {
1398  // Delete existing output file
1399  std::ostringstream command;
1400  command << "rm -f "<<file;
1401  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command.str() << EOM;
1402  FILE* fp = popen(command.str().c_str(), "r");
1403  if(fp==NULL)
1404  {
1405  // Error running popen
1406  std::ostringstream errmsg;
1407  errmsg << "rank "<<myRank<<": Error deleting existing output file (requested by 'delete_file_on_restart' printer option; target filename is "<<file<<")! popen failed to run the command (command was '"<<command.str()<<"')";
1408  printer_error().raise(LOCAL_INFO, errmsg.str());
1409  }
1410  else if(pclose(fp)!=0)
1411  {
1412  // Command returned exit code!=0, or pclose failed
1413  std::ostringstream errmsg;
1414  errmsg << "rank "<<myRank<<": Error deleting existing output file (requested by 'delete_file_on_restart' printer option; target filename is "<<file<<")! Shell command failed to executed successfully, see stderr (command was '"<<command.str()<<"').";
1415  printer_error().raise(LOCAL_INFO, errmsg.str());
1416  }
1417  }
1418  else
1419  {
1420  // File exists, so check if 'group' is readable, and throw error if it exists
1421  // (we are not resuming, so need an empty group to write into)
1422  hid_t file_id = HDF5::openFile(file);
1423  std::string msg_group;
1424  std::cout << "Group readable: " << file << " , " << group << " : " << HDF5::checkGroupReadable(file_id, group, msg_group) << std::endl;
1425  if(HDF5::checkGroupReadable(file_id, group, msg_group))
1426  {
1427  // Group already exists, error!
1428  std::ostringstream errmsg;
1429  errmsg << "Error preparing pre-existing output file '"<<file<<"' for writing via hdf5printer! The requested output group '"<<group<<" already exists in this file! Please take one of the following actions:"<<std::endl;
1430  errmsg << " 1. Choose a new group via the 'group' option in the Printer section of your input YAML file;"<<std::endl;
1431  errmsg << " 2. Delete the existing group from '"<<file<<"';"<<std::endl;
1432  errmsg << " 3. Delete the existing output file, or set 'delete_file_on_restart: true' in your input YAML file to give GAMBIT permission to automatically delete it (applies when -r/--restart flag used);"<<std::endl;
1433  errmsg << std::endl;
1434  errmsg << "*** Note: This error most commonly occurs when you try to resume a scan that has already finished! ***" <<std::endl;
1435  errmsg << std::endl;
1436  printer_error().raise(LOCAL_INFO, errmsg.str());
1437  }
1438  HDF5::closeFile(file_id);
1439  }
1440  }
1441  }
1442  else
1443  {
1444  // No readable output file exists, so there is nothing to resume from. Deactivate resuming.
1445  set_resume(false); //Tell ScannerBit that it shouldn't resume and do not find highest point.
1446  logger() << LogTags::info << "No previous output file found, treating run as completely new." << EOM;
1447  }
1448 
1449  std::cout <<"Rank "<<myRank<<" resume flag? "<<get_resume()<<std::endl;
1450  if(get_resume())
1451  {
1453  std::cout <<"Rank "<<myRank<<": output file readable? "<<HDF5::checkFileReadable(file)<<"(filename: "<<file<<")"<<std::endl;
1454  if( HDF5::checkFileReadable(file) )
1455  {
1456  logger() << LogTags::info << "Scanning existing output file, to prepare for adding new data" << EOM;
1457 
1458  // Open HDF5 file
1459  hid_t file_id = HDF5::openFile(file);
1460 
1461  // Check that group is readable
1462  std::string msg2;
1463  if(not HDF5::checkGroupReadable(file_id, group, msg2))
1464  {
1465  // We are supposed to be resuming, but specified group was not readable in the output file, so we can't.
1466  std::ostringstream errmsg;
1467  errmsg << "Error! GAMBIT is in resume mode, however the chosen output system (HDF5Printer) was unable to open the specified group ("<<group<<") within the existing output file ("<<file<<"). Resuming is therefore not possible; aborting run... (see below for IO error message)";
1468  errmsg << std::endl << "(Strictly speaking we could allow the run to continue (if the scanner can find its necessary output files from the last run), however the printer output from that run is gone, so most likely the scan needs to start again).";
1469  errmsg << std::endl << "IO error message: " << msg2;
1470  printer_error().raise(LOCAL_INFO, errmsg.str());
1471  }
1472 
1473  // Cleanup
1474  HDF5::closeFile(file_id);
1475 
1476  // Check output for signs of corruption, and fix them
1477  // For example if a previous hard shutdown occurred, but the file is not corrupted,
1478  // we might nevertheless still be missing data in some datasets.
1479  // We need to look for this and at least make sure the datasets are all sized correctly.
1480  check_consistency(attempt_repair);
1481 
1482  // Output seems to be readable.
1483  // Get previous highest pointID for our rank from the existing output file
1484  // Might take a while, so time it.
1485  std::chrono::time_point<std::chrono::system_clock> start(std::chrono::system_clock::now());
1486  //PPIDpair highest_PPID
1487  std::map<ulong, ulong> highest_PPIDs = get_highest_PPIDs_from_HDF5();
1488  std::chrono::time_point<std::chrono::system_clock> end(std::chrono::system_clock::now());
1489  std::chrono::duration<double> time_taken = end - start;
1490 
1491  for (size_t rank = 0; rank < mpiSize; rank++ )
1492  {
1493  auto it = highest_PPIDs.find(rank);
1494  if (it != highest_PPIDs.end())
1495  highests[rank] = it->second;
1496  else
1497  highests[rank] = 0;
1498  }
1499 
1500  logger() << LogTags::info << "Extracted highest pointID calculated on rank "<<myRank<<" process during previous scan (it was "<< highests <<") from combined output. Operation took "<<std::chrono::duration_cast<std::chrono::seconds>(time_taken).count()<<" seconds." << EOM;
1501 
1502  }
1503  else
1504  {
1505  logger() << LogTags::info << "No previous output file found; therefore no previous MPIrank/pointID pairs to parse. Will assume that this is a new run (-r/--restart flag was not explicitly given)." << EOM;
1506  }
1507 
1508  }
1509  }
1510 
1511  if(myRank==0 and not get_resume())
1512  {
1513  // No previous output; need to create MPIrank and pointID datasets
1514  // so that they can be used to measure nominal dataset lengths
1515  // Need to make sure this is done before other processes try
1516  // to print anything.
1518 
1519  HDF5DataSet<int> mpiranks ("MPIrank");
1520  HDF5DataSet<int> mpiranks_valid("MPIrank_isvalid");
1521  HDF5DataSet<ulong> pointids ("pointID");
1522  HDF5DataSet<int> pointids_valid("pointID_isvalid");
1523 
1525  mpiranks_valid.create_dataset(buffermaster.get_location_id());
1527  pointids_valid.create_dataset(buffermaster.get_location_id());
1528 
1530  }
1531 
1532 #ifdef WITH_MPI
1533  // Resume might have been deactivated due to lack of existing previous output
1534  // Need to broadcast this to all other processes.
1535  std::vector<int> resume_int_buf(1);
1536  resume_int_buf[0] = get_resume();
1537  myComm.Barrier();
1538  myComm.Bcast(resume_int_buf, 1, 0);
1539  set_resume(resume_int_buf.at(0));
1540 
1541  if(get_resume())
1542  {
1543  unsigned long highest;
1544  myComm.Barrier();
1545  myComm.Scatter(highests, highest, 0);
1546  get_point_id() = highest;
1547  }
1548 #else
1549  if(get_resume())
1550  {
1551  get_point_id() = highests[0];
1552  }
1553 #endif
1554  }
1555  }
1556 
1559  void HDF5Printer2::check_consistency(bool attempt_repair)
1560  {
1561  hid_t fid = HDF5::openFile(get_filename(), false, 'r');
1562  hid_t gid = HDF5::openGroup(fid, get_groupname(), true);
1563 
1564  // Get all object names in the group
1565  std::vector<std::string> names = HDF5::lsGroup(gid);
1566  std::vector<std::string> dset_names;
1567 
1568  // Check if all datasets have the same length
1569  bool first=true;
1570  bool problem=false;
1571  std::size_t dset_length = 0;
1572  std::size_t max_dset_length = 0;
1573  std::size_t min_dset_length = 0;
1574  for(auto it = names.begin(); it!=names.end(); ++it)
1575  {
1576  // Check if object is a dataset (could be another group)
1577  if(HDF5::isDataSet(gid, *it))
1578  {
1579  dset_names.push_back(*it);
1580  HDF5DataSetBasic dset(*it);
1581  dset.open_dataset(gid);
1582  std::size_t this_dset_length = dset.get_dset_length();
1583  if(first)
1584  {
1585  min_dset_length = this_dset_length;
1586  first = false;
1587  }
1588  else if(dset_length!=this_dset_length)
1589  {
1590  problem = true;
1591  }
1592  if(this_dset_length>max_dset_length) max_dset_length = this_dset_length;
1593  if(this_dset_length<min_dset_length) min_dset_length = this_dset_length;
1594  dset_length = this_dset_length;
1595  dset.close_dataset();
1596  }
1597  }
1598 
1599  if(problem)
1600  {
1601  // Length inconsistency detected. Need to try and fix it.
1602  // First, need to know "nominal" length. This is the *shortest*
1603  // of the mpiranks/pointids datasets. We will invalidate data
1604  // in any datasets beyond this length.
1605  // Then we need to know the longest actual length of any dataset.
1606  // We will extend all datasets to this size, and set data as invalid
1607  // beyond the nominal length. Shortening datasets doesn't work well
1608  // in HDF5, so it is better to just leave a chunk of invalid data.
1609 
1610  std::ostringstream warn;
1611  warn << "An inconsistency has been detected in the existing HDF5 output! Not all datasets are the same length. This can happen if your previous run was not shut down safely. We will now check all datasets for corruption (this involves reading the whole HDF5 file so may take some time if it is a large file)"<<std::endl;
1612  warn << "Note: longest dataset: "<<max_dset_length<<std::endl;
1613  warn << " shortest dataset: "<<min_dset_length<<std::endl;
1614  std::cerr << warn.str();
1615  printer_warning().raise(LOCAL_INFO, warn.str());
1616 
1617  bool unfixable_problem=false;
1618  std::string unfixable_report;
1619  std::size_t highest_common_readable_index = 0;
1620 
1621  // We already measured the longest dataset length. So now we want to extend all datasets to this length
1622  std::cerr<<std::endl;
1623  for(auto it = dset_names.begin(); it!=dset_names.end(); ++it)
1624  {
1625  std::cerr<<"\rScanning dataset for problems: "<<*it<<" ";
1626  HDF5DataSetBasic dset(*it);
1627  dset.open_dataset(gid);
1628  std::size_t this_dset_length = dset.get_dset_length();
1629 
1630  // Check that the dataset is fully readable
1631  // Also finds highest readable index if dataset is partially readable
1632  std::pair<bool,std::size_t> readable_info = HDF5::checkDatasetReadable(gid, *it);
1633  if(not readable_info.first)
1634  {
1635  std::ostringstream msg;
1636  msg << " Corrupted dataset detected! Highest readable index was "<<readable_info.second<<" (dataset name = "<<*it<<")"<<std::endl;
1637  unfixable_report += msg.str();
1638  if(not unfixable_problem)
1639  {
1640  highest_common_readable_index = readable_info.second;
1641  unfixable_problem = true;
1642  }
1643  else if(readable_info.second < highest_common_readable_index)
1644  {
1645  highest_common_readable_index = readable_info.second;
1646  }
1647  }
1648  dset.close_dataset();
1649 
1650  // Also need to shorten this highest_common_readable_index in case some non-corrupt datasets happen to be shorter
1651  if(this_dset_length < highest_common_readable_index) highest_common_readable_index = this_dset_length;
1652 
1653  }
1654  std::cerr<<"\rScan of datasets complete! "<<std::endl;
1655 
1656  if(unfixable_problem)
1657  {
1658  // Dataset corruption detected! Need to abort, but we can tell the user some facts about the problem.
1659  std::ostringstream err;
1660  err<<"Corruption detected in existing datasets! You cannot resume writing to the existing HDF5 file. You may be able to recover from this by copying all readable data from these datasets into a new HDF5 file. We have checked the readability of all datasets and determined that the highest index readable in all datasets is:"<<std::endl;
1661  err<<" "<<highest_common_readable_index<<std::endl;
1662  err<<" A full report on the readability of corrupted datasets is given below:"<<std::endl;
1663  err<<unfixable_report;
1664  printer_error().raise(LOCAL_INFO, err.str());
1665  }
1666 
1667  if(attempt_repair)
1668  {
1669  std::ostringstream warn;
1670  warn << "Attempting to automatically repair datasets. Lost data will not be recovered, but file will be restored to a condition such that further data can be added. A report on dataset modifications will be issued below. If you want to disable auto-repair (and get an error message instead) then please set disable_autorepair=true in the YAML options for this printer."<<std::endl;
1671  std::cerr << warn.str();
1672  printer_warning().raise(LOCAL_INFO, warn.str());
1673 
1674  std::ostringstream repair_report;
1675 
1676  // Reopen file in write mode
1677  HDF5::closeGroup(gid);
1678  HDF5::closeFile(fid);
1679  fid = HDF5::openFile(get_filename(), false, 'w');
1680  gid = HDF5::openGroup(fid, get_groupname(), true);
1681 
1682  HDF5DataSet<int> mpiranks ("MPIrank");
1683  HDF5DataSet<int> mpiranks_valid("MPIrank_isvalid");
1684  HDF5DataSet<ulong> pointids ("pointID");
1685  HDF5DataSet<int> pointids_valid("pointID_isvalid");
1686 
1687  mpiranks .open_dataset(gid);
1688  mpiranks_valid.open_dataset(gid);
1689  pointids .open_dataset(gid);
1690  pointids_valid.open_dataset(gid);
1691 
1692  std::size_t nominal_dset_length = mpiranks.get_dset_length();
1693  if(mpiranks_valid.get_dset_length() < nominal_dset_length) nominal_dset_length = mpiranks_valid.get_dset_length();
1694  if(pointids .get_dset_length() < nominal_dset_length) nominal_dset_length = pointids .get_dset_length();
1695  if(pointids_valid.get_dset_length() < nominal_dset_length) nominal_dset_length = pointids_valid.get_dset_length();
1696 
1697  mpiranks .close_dataset();
1698  mpiranks_valid.close_dataset();
1699  pointids .close_dataset();
1700  pointids_valid.close_dataset();
1701 
1702  repair_report << " Nominal dataset length identified as: "<<nominal_dset_length<<std::endl;
1703  repair_report << " Maximum dataset length identified as: "<<max_dset_length<<std::endl;
1704 
1705  // We already measured the longest dataset length. So now we want to extend all datasets to this length
1706  for(auto it = dset_names.begin(); it!=dset_names.end(); ++it)
1707  {
1708  HDF5DataSetBasic dset(*it);
1709  dset.open_dataset(gid);
1710  std::size_t this_dset_length = dset.get_dset_length();
1711  if(max_dset_length > this_dset_length)
1712  {
1713  dset.extend_dset_to(max_dset_length);
1714  repair_report << " Extended dataset from "<<this_dset_length<<" to "<<max_dset_length<<" (name = "<<*it<<")"<<std::endl;
1715  }
1716  dset.close_dataset();
1717  }
1718 
1719  // Next we want to open all the 'isvalid' datasets, and set their entries as 'invalid' between nominal and max lengths.
1720  // Could be that the nominal length is the max length though; in which case no further action is required. That just
1721  // means some datasets were too short. They will automatically be set as invalid in the extended sections.
1722  if(max_dset_length > nominal_dset_length)
1723  {
1724  for(auto it = dset_names.begin(); it!=dset_names.end(); ++it)
1725  {
1726  if(Utils::endsWith(*it,"_isvalid"))
1727  {
1728  HDF5DataSet<int> dset(*it);
1729  std::vector<int> zeros(max_dset_length-nominal_dset_length,0);
1730  dset.write_vector(gid, zeros, nominal_dset_length, true);
1731  repair_report << " Set data as 'invalid' from "<<nominal_dset_length<<" to "<<max_dset_length<<" (name = "<<*it<<")"<<std::endl;
1732  }
1733  }
1734  }
1735  repair_report << " Repairs complete."<<std::endl;
1736  std::cerr << repair_report.str();
1737  printer_warning().raise(LOCAL_INFO, repair_report.str());
1738  }
1739  else
1740  {
1741  std::ostringstream err;
1742  err << "Automatic dataset repair has been disabled, and a problem with your HDF5 file was detected, so this run must stop. Please manually repair your HDF5 file, or set disable_autorepair=false in the YAML options for this printer, and then try again.";
1743  printer_error().raise(LOCAL_INFO, err.str());
1744  }
1745  }
1746 
1747  HDF5::closeGroup(gid);
1748  HDF5::closeFile(fid);
1749  }
1750 
1751 
1754  {
1755  // Nothing special required
1756  }
1757 
1760  {
1761  if(is_auxilliary_printer())
1762  {
1763  std::ostringstream errmsg;
1764  errmsg<<"'add_aux_buffer' function called on auxilliary printer! Only the primary printer should be using this function! This is abug, please report it.";
1765  printer_error().raise(LOCAL_INFO, errmsg.str());
1766  }
1767  aux_buffers.push_back(&aux_buffermaster);
1768  }
1769 
1770  void HDF5Printer2::initialise(const std::vector<int>&)
1771  {
1772  // This printer doesn't need to do anything here
1773  }
1774 
1775  // Clear all data in buffers and on disk for this printer
1776  void HDF5Printer2::reset(bool /*force*/)
1777  {
1778  //TODO: force flag currently not used; do we need it?
1779  buffermaster.reset();
1780  }
1781 
1782  // Flush data in buffers to disk
1784  {
1785  buffermaster.flush();
1786  }
1787 
1788  // Make sure printer output is fully on disk and safe
1789  // No distinction between final and early termination procedure for this printer.
1790  void HDF5Printer2::finalise(bool /*abnormal*/)
1791  {
1792  // DEBUG h5v2_BLOCK message counter
1793  //recv_counter = 0;
1794  //send_counter = 0;
1795 
1796  // The primary printer will take care of finalising all output.
1797  if(not is_auxilliary_printer())
1798  {
1799  // On HPC systems we are likely to be using hundreds or thousands of processes,
1800  // over a networked filesystem. If each process tries to write to the
1801  // output file all at once, it will create an enormous bottleneck and be very
1802  // slow.
1803  // We thus need to send ALL the buffer data to the master process via MPI,
1804  // and have the master process write everything to disk. Fortunately all the
1805  // processes can be synced here, so we can do some big collective
1806  // operations to send everything.
1807 
1808  #ifdef WITH_MPI
1809  // Gather and print the sync buffer data
1810  // Sort all buffers into sync/non-sync
1811  std::vector<HDF5MasterBuffer*> sync_buffers;
1812  std::vector<HDF5MasterBuffer*> RA_buffers;
1813  sync_buffers.push_back(&buffermaster);
1814  for(auto it=aux_buffers.begin(); it!=aux_buffers.end(); ++it)
1815  {
1816  if((*it)->is_synchronised())
1817  {
1818  sync_buffers.push_back(*it);
1819  }
1820  else
1821  {
1822  RA_buffers.push_back(*it);
1823  }
1824  }
1825  #ifdef HDF5PRINTER2_DEBUG
1827  logger()<<"# sync printer streams: "<<sync_buffers.size()<<std::endl;
1828  logger()<<"# RA printer streams : "<<RA_buffers.size()<<EOM;
1829  #endif
1830  logger()<<LogTags::printers<<LogTags::info<<"Gathering sync buffer data from all processes to rank 0 process..."<<EOM;
1831  gather_and_print(buffermaster,sync_buffers,true);
1832  #endif
1833 
1834  // Flush remaining buffer data
1837  if(myRank==0)
1838  {
1839  buffermaster.flush(); // Flush the primary printer
1840  for(auto it=aux_buffers.begin(); it!=aux_buffers.end(); ++it)
1841  {
1842  if((*it)->is_synchronised())
1843  {
1844  (*it)->flush();
1845  }
1846  }
1847  }
1848 
1849  // Flush master process RA print buffers
1850 
1851  std::ostringstream buffer_nonempty_report;
1852  bool buffer_nonempty_warn(false);
1853  std::size_t final_size;
1854  if(myRank==0)
1855  {
1858  final_size = buffermaster.get_next_free_position();
1860  std::cout<<"Final dataset size is "<<final_size<<std::endl;
1861  logger()<< LogTags::printers << LogTags::info << "Final dataset size is "<<final_size<<EOM;
1862  }
1863 
1864  #ifdef WITH_MPI
1865  // Gather RA print buffer data from all other processes
1866 
1867  // Create a dedicate unsynchronised 'aux' buffer handler to receive data from other processes (and also this one!)
1868  HDF5MasterBuffer RAbuffer(get_filename(),get_groupname(),false,get_buffer_length(),myComm);
1869 
1870  // Add it to RA_buffers in case there are none, to satisfy various collective operation requirements
1871  RA_buffers.push_back(&RAbuffer);
1872 
1873  // Gather RA print buffer data from all other processes
1874  logger()<< LogTags::printers << LogTags::info << "Gathering random-access print buffer data from all process to rank 0 process..."<<EOM;
1875  if(myRank==0 and mpiSize>1)
1876  {
1877  // Do the gather
1878  gather_and_print(RAbuffer,RA_buffers,false);
1879  }
1880  else if(myRank>0)
1881  {
1882  // Do the gather
1883  // Note: first argument is the target output buffermanger, but is unused except on rank 0
1884  // So just stick any buffermanager object as the argument to satisfy the function signature requirements.
1885  gather_and_print(buffermaster,RA_buffers,false);
1886  }
1887 
1888  // Try to flush everything left to disk
1889  add_aux_buffer(RAbuffer); // Make sure to include 'gathered' RA data, if there is any.
1890  #endif
1891  for(auto it=aux_buffers.begin(); it!=aux_buffers.end(); ++it)
1892  {
1893  if(not (*it)->is_synchronised())
1894  {
1895  (*it)->flush();
1896  // Check if everything managed to flush!
1897  if(not (*it)->all_buffers_empty())
1898  {
1899  buffer_nonempty_report<<(*it)->buffer_status();
1900  buffer_nonempty_warn = true;
1901  }
1902  // Make sure final dataset size is correct for the unsynchronised buffers
1903  (*it)->extend_all_datasets_to(final_size);
1904  }
1905  }
1906 
1907  if(myRank==0 and buffer_nonempty_warn)
1908  {
1909  std::ostringstream errmsg;
1910  errmsg<<"\nWarning! Not all 'random access' buffers were successfully flushed to disk! This most often occurs when you resume scanning from a run that did not shut down cleanly (at some point in its history). Hard shutdowns can cause loss of samples, and cause subsequent attempts to write data to the missing points to fail, triggering this warning."<<std::endl;
1911  errmsg<<"A report on the data that could not be written to disk is given below. Please consider the impact of this on the integrity of your results:"<<std::endl;
1912  errmsg<<buffer_nonempty_report.str();
1913  std::cout<<errmsg.str();
1914  printer_warning().raise(LOCAL_INFO, errmsg.str());
1915  }
1916 
1917  logger()<< LogTags::printers << LogTags::info << "HDF5Printer2 output finalisation complete."<<EOM;
1918 
1919  // DEBUG
1920  //logger()<<LogTags::printers<<LogTags::debug<<"h5v2_BLOCK send count: "<<send_counter<<std::endl
1921  // <<"h5v2_BLOCK recv count: "<<recv_counter<<EOM;
1922  }
1923  }
1924 
1928  {
1929  if(not is_auxilliary_printer())
1930  {
1931  std::ostringstream errmsg;
1932  errmsg<<"Attempted to get a pointer of derived class type to primary printer, however this object IS the primary printer! This is a bug, please report it.";
1933  printer_error().raise(LOCAL_INFO, errmsg.str());
1934  }
1935  else if(primary_printer==NULL)
1936  {
1937  std::ostringstream errmsg;
1938  errmsg<<"Attempted to get a pointer of derived class type to primary printer, but it was NULL! This means that this auxilliary printer has not been constructed correctly. This is a bug, please report it.";
1939  printer_error().raise(LOCAL_INFO, errmsg.str());
1940  }
1941  return primary_printer;
1942  }
1943 
1944 
1947  {
1949  }
1950 
1953  {
1954  return buffermaster.get_file();
1955  }
1956 
1959  {
1960  return buffermaster.get_group();
1961  }
1962 
1965  {
1967  }
1968 
1970  std::string HDF5Printer2::get_filename(const Options& options)
1971  {
1972  std::string filename;
1973  if(not this->is_auxilliary_printer())
1974  {
1975  // Name of file where results should ultimately end up
1976  std::ostringstream ff;
1977  if(options.hasKey("output_path"))
1978  {
1979  ff << options.getValue<std::string>("output_path") << "/";
1980  }
1981  else
1982  {
1983  ff << options.getValue<std::string>("default_output_path") << "/";
1984  }
1985  if(options.hasKey("output_file"))
1986  {
1987  ff << options.getValue<std::string>("output_file");
1988  }
1989  else
1990  {
1991  printer_error().raise(LOCAL_INFO, "No 'output_file' entry specified in the options section of the Printer category of the input YAML file. Please add a name there for the output hdf5 file of the scan.");
1992  }
1993  filename = ff.str();
1994  }
1995  else
1996  {
1997  // Get filename from primary printer object
1998  filename = get_HDF5_primary_printer()->get_filename();
1999  }
2000  return filename;
2001  }
2002 
2004  std::string HDF5Printer2::get_groupname(const Options& options)
2005  {
2006  std::string groupname;
2007  if(not is_auxilliary_printer())
2008  {
2009  groupname = options.getValueOrDef<std::string>("/","group");
2010  }
2011  else
2012  {
2013  // Get groupname from primary printer
2014  groupname = get_HDF5_primary_printer()->get_groupname();
2015  }
2016  return groupname;
2017  }
2018 
2020  std::size_t HDF5Printer2::get_buffer_length(const Options& options)
2021  {
2022  std::size_t buflen;
2023  if(not is_auxilliary_printer())
2024  {
2025  buflen = options.getValueOrDef<std::size_t>(1000,"buffer_length");
2026  if(buflen > MAX_BUFFER_SIZE)
2027  {
2028  std::ostringstream errmsg;
2029  errmsg<<"Requested buffer length is larger than maximum allowed (which is MAX_BUFFER_SIZE="<<MAX_BUFFER_SIZE<<"). Please specify a shorter buffer_length for the HDF5 printer.";
2030  printer_error().raise(LOCAL_INFO, errmsg.str());
2031  }
2032  }
2033  else
2034  {
2035  // Get buffer length from primary printer
2037  }
2038  return buflen;
2039  }
2040 
2041  bool HDF5Printer2::get_sync(const Options& options)
2042  {
2043  return options.getValueOrDef<bool>(true,"synchronised");
2044  }
2045 
2046  // Get options required to construct a reader object that can read
2047  // the previous output of this printer.
2049  {
2050  Options options;
2051  // Set options that we need later to construct a reader object for
2052  // previous output, if required.
2053  options.setValue("type", "hdf5");
2054  options.setValue("file", buffermaster.get_file());
2055  options.setValue("group", buffermaster.get_group());
2056  return options;
2057  }
2058 
2059 #ifdef WITH_MPI
2060  GMPI::Comm& HDF5Printer2::get_Comm() {return myComm;}
2062 
2063  // Determine ID codes to use for buffer transmission for all buffers across a collection of buffer managers
2064  std::pair<std::map<std::string,int>,std::vector<std::pair<std::string,int>>> HDF5Printer2::get_buffer_idcodes(const std::vector<HDF5MasterBuffer*>& masterbuffers)
2065  {
2066  logger()<<LogTags::printers<<LogTags::debug<<"Determining ID codes for HDF5 buffers for MPI transmission..."<<EOM;
2067  int rank = myComm.Get_rank();
2068  // Gather information on buffers to be gathered from
2069  // all processes
2070  std::stringstream buffernames;
2071  std::string delim = "`~`";
2072 
2073  // We will pack all the buffer names into one big string, and separate them
2074  // again on each process.
2075  // NOTE! This would be a lot of data to transmit, but actually we only need
2076  // to transmit buffer names that don't already exist in the output file!
2077  std::set<std::pair<std::string,int>> bufdefs;
2078  for(auto bt=masterbuffers.begin(); bt!=masterbuffers.end(); ++bt)
2079  {
2080  std::map<std::string,HDF5BufferBase*> all_buffers = (*bt)->get_all_buffers();
2081  for(auto it=all_buffers.begin(); it!=all_buffers.end(); ++it)
2082  {
2083  if(not it->second->exists_on_disk())
2084  {
2085  bufdefs.insert(std::make_pair(it->first,it->second->get_type_id()));
2086  }
2087  }
2088  }
2089 
2090  // Set made sure name/types were unique, now convert to a big string for transmission
2091  logger()<<LogTags::printers<<LogTags::debug<<"Converting buffer names and types to string for transmission..."<<std::endl;
2092  for(auto it=bufdefs.begin(); it!=bufdefs.end(); ++it)
2093  {
2094  buffernames<<it->first<<delim;
2095  buffernames<<it->second<<delim;
2096  logger()<<" type: "<<it->second<<"; name: "<<it->first<<std::endl;
2097  }
2098  std::string namestr = buffernames.str();
2099  #ifdef HDF5PRINTER2_DEBUG
2100  logger()<<"Full string to transmit:"<<std::endl;
2101  #endif
2102  logger()<<namestr<<EOM;
2103 
2104  // First gather lengths of strings to receive from all processes
2105  std::vector<int> totallen;
2106  totallen.push_back(namestr.length());
2107  std::vector<int> alllens(myComm.Get_size());
2108  logger()<<LogTags::printers<<LogTags::debug<<"Gathering lengths of string messages..."<<std::endl;
2109  #ifdef HDF5PRINTER2_DEBUG
2110  logger()<<"Initial state: "<<alllens;
2111  #endif
2112  logger()<<EOM;
2113  myComm.Gather(totallen, alllens, 0);
2114  #ifdef HDF5PRINTER2_DEBUG
2115  logger()<<LogTags::printers<<LogTags::debug<<"Final state : "<<alllens<<EOM;
2116  #endif
2117 
2118  std::size_t totalstrsize = 0;
2119  for(auto it=alllens.begin(); it!=alllens.end(); ++it)
2120  {
2121  // Check validity
2122  if(*it<0)
2123  {
2124  std::ostringstream errmsg;
2125  errmsg<<"Received a negative value for the total length of new buffer names from a process! The value might have overflowed! In any case it makes no sense, something bad has happened.";
2126  printer_error().raise(LOCAL_INFO, errmsg.str());
2127  }
2128  totalstrsize += *it;
2129  }
2130 
2131  // Check for int overflow (max single message size)
2132  std::size_t maxint = std::numeric_limits<int>::max();
2133  if(totalstrsize > maxint)
2134  {
2135  std::ostringstream errmsg;
2136  errmsg<<"Complete buffer name message is larger than MPI limits for a single message! (Required size: "<<totalstrsize<<", max size on this system (largest int): "<<maxint<<")";
2137  printer_error().raise(LOCAL_INFO, errmsg.str());
2138  }
2139 
2140  // Now gather all the strings
2141  std::vector<char> sendnames(namestr.begin(), namestr.end());
2142  std::vector<char> recvnames(totalstrsize);
2143  logger()<<LogTags::printers<<LogTags::debug<<"Gathering all buffer name strings..."<<std::endl;
2144  #ifdef HDF5PRINTER2_DEBUG
2145  logger()<<"sendnames:"<<sendnames;
2146  #endif
2147  logger()<<EOM;
2148  myComm.Gatherv(sendnames, recvnames, alllens, 0);
2149  #ifdef HDF5PRINTER2_DEBUG
2150  logger()<<LogTags::printers<<LogTags::debug<<"recvnames:"<<recvnames<<EOM;
2151  #endif
2152 
2153  // Process names and assign IDs
2154  std::stringstream sendbufs;
2155  std::vector<std::pair<std::string,int>> ordered_bufs; // Will only be filled on rank 0
2156  if(rank==0)
2157  {
2158  // Split them back into their individual buffer names and types
2159  std::string recvnames_str(recvnames.begin(), recvnames.end());
2160  std::vector<std::string> all_buf_names_and_types = Utils::split(recvnames_str,delim);
2161  if(!all_buf_names_and_types.empty())
2162  {
2163  all_buf_names_and_types.pop_back(); // Remove trailing empty element
2164  // Make sure result is of even length (always need name + type)
2165  if(all_buf_names_and_types.size() % 2 != 0)
2166  {
2167  std::ostringstream errmsg;
2168  errmsg<<"all_buf_names_and_types vector doesn't have an even number of elements! This is a bug, please report it.";
2169  printer_error().raise(LOCAL_INFO, errmsg.str());
2170  }
2171  }
2172 
2173  std::set<std::pair<std::string,int>> all_buf_pairs; // Remove duplicates via set
2174  logger()<<LogTags::printers<<LogTags::debug<<"Splitting received buffer name string..."<<std::endl;
2175  #ifdef HDF5PRINTER2_DEBUG
2176  logger()<<"Input: "<<recvnames_str<<std::endl;
2177  logger()<<"All size: "<<all_buf_names_and_types.size()<<std::endl;
2178  logger()<<"All: "<<all_buf_names_and_types<<std::endl;
2179  #endif
2180  for(auto it=all_buf_names_and_types.begin(); it!=all_buf_names_and_types.end(); ++it)
2181  {
2182  if(*it!="")
2183  {
2184  std::string name(*it);
2185  ++it;
2186  if(it!=all_buf_names_and_types.end())
2187  {
2188  int type(std::stoi(*it));
2189  all_buf_pairs.insert(std::make_pair(name,type));
2190  logger()<<" Inserted: type:"<<type<<"; name:"<<name<<std::endl;
2191  }
2192  else
2193  {
2194  std::ostringstream errmsg;
2195  errmsg<<"Iterated past end of all_buf_names_and_types! This is a bug, please report it.";
2196  printer_error().raise(LOCAL_INFO, errmsg.str());
2197  }
2198  }
2199  }
2200  logger()<<EOM;
2201 
2202  // Add the buffers that are already on disk
2203  // (do this via any of the buffer managers; they should all be managing buffers in the same
2204  // HDF5 group or else none of this will work anyway)
2205  std::vector<std::pair<std::string,int>> existing_bufs = masterbuffers.at(0)->get_all_dset_names_on_disk();
2206  logger()<<LogTags::printers<<LogTags::debug<<"Adding buffer names already on disk:"<<std::endl;
2207  for(auto it=existing_bufs.begin(); it!=existing_bufs.end(); ++it)
2208  {
2209  all_buf_pairs.insert(*it);
2210  logger()<<" Added: type:"<<it->second<<"; name:"<<it->first<<std::endl;
2211  }
2212  logger()<<EOM;
2213 
2214  // Figure out ID codes for all the buffers based on their order
2215  // in the unique set. Should end up the same on all processes.
2216  ordered_bufs = std::vector<std::pair<std::string,int>>(all_buf_pairs.begin(),all_buf_pairs.end());
2217 
2218  // Prepare buffers for sending. Don't need the types this time.
2219  for(auto it=ordered_bufs.begin(); it!=ordered_bufs.end(); ++it)
2220  {
2221  sendbufs<<it->first<<delim;
2222  }
2223  }
2224  namestr = sendbufs.str(); // might as well re-use these variables
2225  sendnames = std::vector<char>(namestr.begin(), namestr.end());
2226  std::vector<int> size(1);
2227  size[0] = sendnames.size();
2228  logger()<<LogTags::printers<<LogTags::debug<<"Broadcasting size of composited buffer name string"<<std::endl;
2229  #ifdef HDF5PRINTER2_DEBUG
2230  logger()<<" namestr:"<<namestr<<std::endl;
2231  logger()<<" namestr.size():"<<size.at(0);
2232  #endif
2233  logger()<<EOM;
2234  // Broadcast the required recv buffer size
2235  myComm.Bcast(size, 1, 0);
2236  logger()<<LogTags::printers<<LogTags::debug<<"Received size for composited buffer name string:"<<size<<EOM;
2237 
2238  if(rank!=0) sendnames = std::vector<char>(size.at(0));
2239 
2240  // Broadcast the buffer names in the order defining their ID codes
2241  logger()<<LogTags::printers<<LogTags::debug<<"Broadcasting composited buffer name string"<<std::endl;
2242  #ifdef HDF5PRINTER2_DEBUG
2243  logger()<<" sendnames:"<<sendnames;
2244  #endif
2245  logger()<<EOM;
2246  myComm.Bcast(sendnames, size.at(0), 0);
2247  #ifdef HDF5PRINTER2_DEBUG
2248  logger()<<LogTags::printers<<LogTags::debug<<"Received:"<<sendnames<<EOM;
2249  #endif
2250  // Split them back into their individual buffer names
2251  std::string allnames_str(sendnames.begin(), sendnames.end());
2252  std::vector<std::string> buf_order = Utils::split(allnames_str,delim);
2253 
2254  // Compute map of ID codes
2255  std::map<std::string,int> idcodes;
2256  logger()<<LogTags::printers<<LogTags::debug<<"Computing buffer ID codes:"<<std::endl;
2257  if(buf_order.size()>0)
2258  {
2259  for(std::size_t i=0; i<buf_order.size()-1; i++)
2260  {
2261  idcodes[buf_order.at(i)] = i;
2262  logger()<<" "<<i<<": "<<buf_order.at(i)<<std::endl;
2263  }
2264  logger()<<EOM;
2265  }
2266  return std::make_pair(idcodes,ordered_bufs); // Note: ordered_bufs only filled on rank 0!
2267  }
2268 
2269  // Gather buffer data from all processes via MPI and print it on rank 0
2270  void HDF5Printer2::gather_and_print(HDF5MasterBuffer& out_printbuffer, const std::vector<HDF5MasterBuffer*>& masterbuffers, bool sync)
2271  {
2272  // First need to gatherv information on buffer sizes to be sent
2273  std::vector<unsigned long> pointsbuffers(2);
2274  std::vector<unsigned long> pointsbuffersperprocess(mpiSize*2); // number of points and buffers to be recv'd for each process
2275  std::vector<unsigned long> pointsperprocess(mpiSize);
2276  std::vector<unsigned long> buffersperprocess(mpiSize);
2277 
2278  // Count points and buffers across all buffer manager objects
2279  auto it=masterbuffers.begin();
2280  if(it==masterbuffers.end())
2281  {
2282  std::ostringstream errmsg;
2283  errmsg<<"No buffers supplied for gathering!";
2284  printer_error().raise(LOCAL_INFO, errmsg.str());
2285  }
2286  pointsbuffers[0] = (*it)->get_Npoints();
2287  pointsbuffers[1] = (*it)->get_Nbuffers();
2288  ++it;
2289  for(;it!=masterbuffers.end(); ++it)
2290  {
2291  // In sync case points are shared across all buffer manager objects
2292  // In aux case, points may or may not be shared. For the size
2293  // computations we'll just have to assume they are not shared.
2294  if(not sync) pointsbuffers[0] += (*it)->get_Npoints();
2295  pointsbuffers[1] += (*it)->get_Nbuffers();
2296  }
2297 
2299  logger()<<"Gathering data on number of points and buffers to be transmitted"<<std::endl;
2300  logger()<<"Number of points to send from this process: "<<pointsbuffers.at(0)<<std::endl;
2301  logger()<<"Number of buffers to send from this process: "<<pointsbuffers.at(1)<<std::endl;
2302  logger()<<EOM;
2303 
2304  // Need to do AllGather because all processes need to be able to do the size
2305  // computation to figure out if the main Gatherv operation needs to be split
2306  // into pieces.
2307  myComm.AllGather(pointsbuffers, pointsbuffersperprocess);
2308  for(std::size_t i=0; i<mpiSize; i++)
2309  {
2310  std::size_t j = 2*i;
2311  std::size_t k = 2*i+1;
2312  pointsperprocess[i] = pointsbuffersperprocess.at(j);
2313  buffersperprocess[i] = pointsbuffersperprocess.at(k);
2314  }
2316  logger()<<"Gathered data:"<<std::endl;
2317  logger()<<"Number of points to recv from each process: "<<pointsperprocess<<std::endl;
2318  logger()<<"Number of buffers to recv from each process: "<<buffersperprocess<<std::endl;
2319  logger()<<EOM;
2320 
2321  // Now we know how many points we need to recv per process, can figure out
2322  // how many we can recv at once.
2323  // Need to compute approximate MB of storage required by each process
2324  std::vector<std::vector<int>> groups;
2325  groups.push_back(std::vector<int>());
2326  double running_tot_MB = 0;
2327  for(std::size_t i=0; i<mpiSize; i++)
2328  {
2329  double N_MB = pow(2,-20) * (8.+4.) * (double)buffersperprocess.at(i) * (double)pointsperprocess.at(i);
2330  running_tot_MB += N_MB;
2331  if((running_tot_MB > RAMlimit) and (groups.back().size()>1))
2332  {
2333  // Make a new group each time we go over the RAM limit, so long as the previous group wasn't empty (aside from rank 0 process)
2334  groups.push_back(std::vector<int>());
2335  groups.back().push_back(0); // Always need process 0 in there
2336  }
2337  groups.back().push_back(i);
2338  }
2340  logger()<<"Number of <500MB Gathers to perform: "<<groups.size()<<std::endl;
2341  logger()<<"Assigned communicator groups are:"<<std::endl;
2342  for(auto it=groups.begin(); it!=groups.end(); ++it)
2343  {
2344  logger()<<" "<<(*it)<<std::endl;
2345  }
2346  logger()<<EOM;
2347 
2348  // Get ID codes for the buffers (requires collective operations)
2349  std::pair<std::map<std::string,int>,std::vector<std::pair<std::string,int>>> ids_and_types = get_buffer_idcodes(masterbuffers);
2350  std::map<std::string,int> buf_ids = ids_and_types.first;
2351  std::vector<std::pair<std::string,int>> buf_types = ids_and_types.second;
2352 
2353  for(std::size_t i=0; i<groups.size(); i++)
2354  {
2355  // Create new communicator group for the Gather
2356  std::stringstream ss;
2357  ss<<"Gather group "<<i;
2358  GMPI::Comm subComm(groups.at(i),ss.str());
2359 
2360  // Gather the buffer data from this group of processes to rank 0
2361 
2363  logger()<<"Gathering buffer data for processes "<<groups.at(i)<<EOM;
2364 
2365  // Are we in this group? Communicator is null if not.
2366  if(*(subComm.get_boundcomm())!=MPI_COMM_NULL)
2367  {
2368  //std::cerr<<"rank "<<myRank<<" calling gather_all..."<<std::endl;
2369  std::vector<HDF5bufferchunk> data = gather_all(subComm, masterbuffers, buf_ids);
2370  // Add the data to the output buffermanager on rank 0
2371  if(myRank==0)
2372  {
2373  out_printbuffer.add_to_buffers(data,buf_types);
2374  out_printbuffer.resynchronise();
2375  out_printbuffer.flush();
2376  }
2377  }
2378  }
2379  }
2380 
2381  // Gather (via MPI) all HDF5 buffer chunk data from a set of managed buffers
2382  std::vector<HDF5bufferchunk> HDF5Printer2::gather_all(GMPI::Comm& comm, const std::vector<HDF5MasterBuffer*>& masterbuffers, const std::map<std::string,int>& buf_ids)
2383  {
2384  if(*(comm.get_boundcomm())==MPI_COMM_NULL)
2385  {
2386  std::ostringstream errmsg;
2387  errmsg<<"Attempted to call gather_all with an invalid communicator! This is a bug, please report it.";
2388  printer_error().raise(LOCAL_INFO, errmsg.str());
2389  }
2390 
2391  // Build blocks to be transmitted
2392  std::vector<HDF5bufferchunk> bufchunks;
2393  std::vector<PPIDpair> sub_order;
2394  std::size_t i=0; // point counter
2395  HDF5bufferchunk newchunk;
2396 
2397  // First need to determine all points known to all buffers
2398  // For sync buffers should be the same for all buffer managers,
2399  // but may not be for aux buffers.
2400  std::set<PPIDpair> buffered_points;
2401  for(auto bt=masterbuffers.begin(); bt!=masterbuffers.end(); ++bt)
2402  {
2403  const std::set<PPIDpair>& all_points = (*bt)->get_all_points();
2404  for(auto ct=all_points.begin(); ct!=all_points.end(); ++ct)
2405  {
2406  buffered_points.insert(*ct);
2407  }
2408  }
2409 
2410  logger()<<LogTags::printers<<LogTags::debug<<"Building buffer chunks for MPI transmission"<<EOM;
2411  for(auto it=buffered_points.begin(); it!=buffered_points.end(); ++it)
2412  {
2413  sub_order.push_back(*it);
2414  newchunk.pointIDs[i] = it->pointID;
2415  newchunk.ranks[i] = it->rank;
2416  i++;
2417  // Collected enough points to fill a chunk yet? If so, create a chunk.
2418  if(sub_order.size()==HDF5bufferchunk::SIZE or std::next(it)==buffered_points.end())
2419  {
2420  #ifdef HDF5PRINTER2_DEBUG
2421  logger()<<"Obtained points for new chunk "<<std::endl;
2422  if(sub_order.size()==HDF5bufferchunk::SIZE)
2423  {
2424  logger()<<"(chunk full):"<<std::endl;
2425  }
2426  else
2427  {
2428  logger()<<"(chunk not full, but no more points to add):"<<std::endl;
2429  }
2430  for(auto ct=sub_order.begin(); ct!=sub_order.end(); ++ct)
2431  {
2432  logger()<<" (rank="<<ct->rank<<", pointID="<<ct->pointID<<")"<<std::endl;
2433  }
2434  #endif
2435 
2436  // Go through all the buffers and add their values for these points
2437  std::size_t j=0; // buffer selection index
2438  for(auto bt=masterbuffers.begin(); bt!=masterbuffers.end(); ++bt)
2439  {
2440  // Check that at least some of the selected points are stored in this set of buffers
2441  const std::set<PPIDpair>& points_in_these_bufs = (*bt)->get_all_points();
2442  std::set<PPIDpair> intersec;
2443  std::set_intersection(sub_order.begin(),sub_order.end(),points_in_these_bufs.begin(),points_in_these_bufs.end(),
2444  std::inserter(intersec,intersec.begin()));
2445  if(intersec.size()>0)
2446  {
2447  std::map<std::string,HDF5BufferBase*> all_buffers = (*bt)->get_all_buffers();
2448  for(auto jt=all_buffers.begin(); jt!=all_buffers.end(); ++jt)
2449  {
2450  bool last_buffer = (std::next(jt)==all_buffers.end() and std::next(bt)==masterbuffers.end());
2451  #ifdef HDF5PRINTER2_DEBUG
2453  #endif
2454  if(HDF5::is_float_type(jt->second->get_type_id()))
2455  {
2456  std::pair<std::vector<double>,std::vector<int>> buffer;
2457  buffer = jt->second->flush_to_vector_dbl(sub_order);
2458  std::vector<double> values = buffer.first;
2459  std::vector<int> valid = buffer.second;
2460  // Check that at least some of this data is valid before wasting a chunk slot on it
2461  auto kt = std::find(valid.begin(), valid.end(), 1);
2462  if(kt!=valid.end())
2463  {
2464  #ifdef HDF5PRINTER2_DEBUG
2465  logger()<<"Adding float values for buffer "<<jt->first<<std::endl;
2466  #endif
2467  newchunk.name_id[j] = buf_ids.at(jt->first);
2468  for(std::size_t i2=0; i2<sub_order.size(); i2++)
2469  {
2470  #ifdef HDF5PRINTER2_DEBUG
2471  logger()<<" value="<<values.at(i2)<<", valid="<<valid.at(i2)<<std::endl;
2472  #endif
2473  newchunk.values[j][i2] = values.at(i2);
2474  newchunk.valid[j][i2] = valid.at(i2);
2475  }
2476  j++; // Move to next buffer slot
2477  }
2478  }
2479  else // int version
2480  {
2481  std::pair<std::vector<long>,std::vector<int>> buffer;
2482  buffer = jt->second->flush_to_vector_int(sub_order);
2483  std::vector<long> values = buffer.first;
2484  std::vector<int> valid = buffer.second;
2485  // Check that at least some of this data is valid before wasting a chunk slot on it
2486  auto kt = std::find(valid.begin(), valid.end(), 1);
2487  if(kt!=valid.end())
2488  {
2489  #ifdef HDF5PRINTER2_DEBUG
2490  logger()<<"Adding int values for buffer "<<jt->first<<std::endl;
2491  #endif
2492  newchunk.name_id[j] = buf_ids.at(jt->first);
2493  for(std::size_t i2=0; i2<sub_order.size(); i2++)
2494  {
2495  #ifdef HDF5PRINTER2_DEBUG
2496  logger()<<" value="<<values.at(i2)<<", valid="<<valid.at(i2)<<std::endl;
2497  #endif
2498  newchunk.values_int[j][i2] = values.at(i2);
2499  newchunk.valid[j][i2] = valid.at(i2);
2500  }
2501  j++; // Move to next buffer slot
2502  }
2503  }
2504  if(j==HDF5bufferchunk::NBUFFERS or last_buffer)
2505  {
2506  // Chunk full, begin another.
2507  if(i>HDF5bufferchunk::SIZE)
2508  {
2509  std::ostringstream errmsg;
2510  errmsg<<"Point counter exceeded allowed chunk size somehow (i>SIZE;"<<i<<">"<<HDF5bufferchunk::SIZE<<"). This is a bug, please report it.";
2511  printer_error().raise(LOCAL_INFO, errmsg.str());
2512  }
2514  {
2515  std::ostringstream errmsg;
2516  errmsg<<"Buffer counter exceeded allowed chunk size somehow (j>NBUFFERS;"<<j<<">"<<HDF5bufferchunk::NBUFFERS<<"). This is a bug, please report it.";
2517  printer_error().raise(LOCAL_INFO, errmsg.str());
2518  }
2519  newchunk.used_size = i;
2520  newchunk.used_nbuffers = j;
2521  bufchunks.push_back(newchunk);
2522  #ifdef HDF5PRINTER2_DEBUG
2523  if(last_buffer)
2524  {
2525  logger()<<"Chunk finished (no more buffer data for these points) (used_size="<<i<<", used_nbuffers="<<j<<")"<<EOM;
2526  }
2527  else
2528  {
2529  logger()<<"Chunk finished (no space for more buffers) (used_size="<<i<<", used_nbuffers="<<j<<")"<<EOM;
2531  logger()<<"Beginning new chunk with same points as last"<<std::endl;
2532  }
2533  #endif
2534  // Reset chunk (can leave the data, will be ignored so long as these counters are reset)
2535  newchunk.used_size = 0;
2536  newchunk.used_nbuffers = 0;
2537  j=0;
2538  }
2539  }
2540  // Inform master buffer that we have removed some points from it
2541  (*bt)->untrack_points(intersec);
2542  }
2543  else if(j!=0)
2544  {
2545  // else skip these buffers; the current point isn't in them (must have come from one of the other sets of buffers).
2546  // But if this was the last buffer manager then need to close off the chunk.
2547  if(i>HDF5bufferchunk::SIZE)
2548  {
2549  std::ostringstream errmsg;
2550  errmsg<<"Point counter exceeded allowed chunk size somehow (i>SIZE;"<<i<<">"<<HDF5bufferchunk::SIZE<<"). This is a bug, please report it.";
2551  printer_error().raise(LOCAL_INFO, errmsg.str());
2552  }
2554  {
2555  std::ostringstream errmsg;
2556  errmsg<<"Buffer counter exceeded allowed chunk size somehow (j>NBUFFERS;"<<j<<">"<<HDF5bufferchunk::NBUFFERS<<"). This is a bug, please report it.";
2557  printer_error().raise(LOCAL_INFO, errmsg.str());
2558  }
2559  newchunk.used_size = i;
2560  newchunk.used_nbuffers = j;
2561  bufchunks.push_back(newchunk);
2562  #ifdef HDF5PRINTER2_DEBUG
2563  logger()<<"Chunk finished (no more buffer data for these points; skipped last masterbuffer) (used_size="<<i<<", used_nbuffers="<<j<<")"<<EOM;
2564  #endif
2565  // Reset chunk (can leave the data, will be ignored so long as these counters are reset)
2566  newchunk.used_size = 0;
2567  newchunk.used_nbuffers = 0;
2568  }
2569  else
2570  {
2571  // Skipped last buffer, but was left with an empty chunk. No big deal, just note it in the logs
2572  #ifdef HDF5PRINTER2_DEBUG
2573  logger()<<"Chunk finished (no more buffer data for these points; skipped last masterbuffer; chunk remains empty, discarding it."<<EOM;
2574  #endif
2575  newchunk.used_size = 0;
2576  newchunk.used_nbuffers = 0;
2577  }
2578  }
2579  i=0;
2580  sub_order.clear();
2581  }
2582  }
2583 
2584  // Transmit information about number of blocks to transmit
2585  std::vector<int> nblocks;
2586  std::vector<int> all_nblocks(comm.Get_size());
2587  nblocks.push_back(bufchunks.size());
2589  logger()<<"Gathering buffer block size data (number of buffer blocks to transmit from this process: "<<nblocks.at(0)<<")"<<EOM;
2590  comm.Gather(nblocks, all_nblocks, 0);
2591 
2592  // Transmit blocks
2593  std::size_t total_nblocks = 0;
2594  if(comm.Get_rank()==0)
2595  {
2597  logger()<<"Number of blocks to recv from each process: "<<all_nblocks<<std::endl;
2598  for(auto it=all_nblocks.begin(); it!=all_nblocks.end(); ++it)
2599  {
2600  total_nblocks += *it;
2601  }
2602  logger()<<"Total: "<<total_nblocks<<std::endl;
2603  logger()<<EOM;
2604  }
2605  std::vector<HDF5bufferchunk> all_bufblocks(total_nblocks);
2607  logger()<<"Performing Gatherv of buffer data blocks..."<<EOM;
2608  comm.Gatherv(bufchunks, all_bufblocks, all_nblocks, 0); // (sendbuf, recvbuf, recvcounts, root)
2609 
2610  #ifdef HDF5PRINTER2_DEBUG
2611  if(comm.Get_rank()==0)
2612  {
2613  // Check that received data makes sense.
2614  logger()<<LogTags::printers<<LogTags::debug<<"Checking received data..."<<std::endl;
2615  int i = 0;
2616  int globi = 0;
2617  int rrank = 0;
2618  for(auto it=all_bufblocks.begin(); it!=all_bufblocks.end(); ++it)
2619  {
2620  if(i==all_nblocks.at(rrank)) { rrank++; i=0; }
2621  logger()<<" block "<<globi<<" ("<<i<<"), received from rank "<<rrank<<", used_size="<<it->used_size<<std::endl;
2622  for(int j=0; j<it->used_size; j++)
2623  {
2624  logger()<<" ranks["<<j<<"]="<<it->ranks[j]<<", pointIDs["<<j<<"]="<<it->pointIDs[j]<<std::endl;
2625  }
2626  i++; globi++;
2627  }
2628  }
2629  #endif
2630 
2631  return all_bufblocks;
2632  }
2633 #endif
2634 
2636 
2637  }
2638 }
2639 
void ensure_dataset_is_open() const
Enforce that the dataset must be open for whatever follows (or else an error is thrown) ...
std::pair< bool, std::size_t > checkDatasetReadable(hid_t location, const std::string &dsetname)
Check if a dataset exists and can be read from fully (Reads through entire dataset to make sure! May ...
Definition: hdf5tools.cpp:432
DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry DecayTable::Entry double
std::size_t write_vector(const hid_t loc_id, const std::vector< T > &data, const std::size_t target_pos, const bool force=false)
Write a vector of data to disk at the target position.
greatScanData data
Definition: great.cpp:38
The main printer class for output to HDF5 format.
void close_dataset()
Close dataset on disk and release handles.
bool get_sync(const Options &options)
Report whether this printer prints in synchronised or &#39;random&#39; mode.
EXPORT_SYMBOLS std::vector< std::string > split(const std::string &input, const std::string &delimiter)
Split string into vector of strings, using a delimiter string.
EXPORT_SYMBOLS error & printer_error()
Printer errors.
HDF5Printer2 * primary_printer
Pointer to primary printer object.
constexpr int h5v2_type< int >()
Definition: hdf5tools.hpp:279
void open_dataset(hid_t location_id)
Open dataset on disk and obtain HDF5 handles.
std::string myname() const
Retrieve name of the dataset we are supposed to access.
void finalise(bool abnormal=false)
std::size_t get_Npoints()
Report number of points currently in the buffer.
void resynchronise()
Make sure all buffers know about all points in all buffers.
std::vector< T > get_chunk(std::size_t offset, std::size_t length) const
Extract a data slice from the linked dataset.
std::size_t get_Nbuffers()
Report number of buffers that we are managing.
void reset(bool force=false)
#define LOCAL_INFO
Definition: local_info.hpp:34
bool get_exists_on_disk() const
Variable tracking whether the dataset is known to exist in the output file yet.
virtual void create_dataset(hid_t location_id)=0
Create a new dataset at the specified location (implemented in derived class since need to know the t...
std::map< std::string, HDF5BufferBase * > all_buffers
Map containing pointers to all buffers managed by this object;.
void close_and_unlock_file()
Close (and unlock) output HDF5 file and release HDF5 handles.
Constructable class for doing basic operations on a HDF5 dataset.
std::pair< hid_t, hid_t > select_hyperslab(std::size_t offset, std::size_t length) const
Obtain memory and dataspace identifiers for writing to a hyperslab in the dataset.
hid_t openFile(const std::string &fname, bool overwrite, bool &oldfile, const char access_type='r')
File and group manipulation.
Definition: hdf5tools.cpp:182
std::map< PPIDpair, std::size_t > get_position_map(const std::vector< PPIDpair > &buffer) const
Obtain positions in output datasets for a buffer of points.
void ensure_file_is_open() const
Ensure HDF5 file is open (and locked for us to use)
HDF5 printer version 2.
HDF5DataSetBase(const std::string &name, const hid_t hdftype_id)
HDF5DataSetBase member functions.
HDF5MasterBuffer buffermaster
Object interfacing to HDF5 file and all datasets.
EXPORT_SYMBOLS unsigned long long int & get_point_id()
Returns unigue pointid;.
HDF5Printer2 * get_HDF5_primary_printer()
Get pointer to primary printer of this class type (get_primary_printer returns a pointer-to-base) ...
const int h5v2_bufdata_type(14)
hid_t get_location_id()
Retrieve the location_id specifying where output should be created in the HDF5 file.
HDF5Printer2(const Options &options, BasePrinter *const primary=NULL)
Constructor (for construction via inifile options)
HDF5_BACKEND_TYPES void add_aux_buffer(HDF5MasterBuffer &aux_buffermaster)
Add buffer to the primary printers records.
void check_consistency(bool attempt_repair)
Check all datasets in a group for length inconsistencies and correct them if possible.
bool hasKey(const args &... keys) const
Getters for key/value pairs (which is all the options node should contain)
void reset()
Clear all data in buffers and on disk for this printer.
General small utility functions.
EXPORT_SYMBOLS bool endsWith(const std::string &str, const std::string &suffix)
Checks whether `str&#39; ends with `suffix&#39;.
int inttype_from_h5type(hid_t h5type)
Definition: hdf5tools.cpp:783
TYPE getValue(const args &... keys) const
std::set< PPIDpair > buffer_set
Set detailing what points are in the buffer.
hid_t get_hdftype_id() const
Retrieve the HDF5 type ID for this dataset.
bool is_float_type(int inttype)
Definition: hdf5tools.cpp:810
std::string get_file()
Report what output file we are targeting.
std::size_t get_dset_length() const
Retrieve the current size of the dataset on disk.
bool dataset_exists(const hid_t loc_id)
Check if our dataset exists on disk with the required name at the given location. ...
std::map< ulong, ulong > get_highest_PPIDs_from_HDF5()
Search the existing output and find the highest used pointIDs for each rank.
void initialise(const std::vector< int > &)
Base class virtual function overloads (the public virtual interface)
std::set< PPIDpair > buffered_points_set
We also need a set of the buffered points, so we can do fast lookup of whether a point is in the buff...
void update_buffer_map(const std::string &label, HDF5BufferBase &buff)
Add base class pointer for a buffer to master buffer map.
HDF5DataSetBasic(const std::string &name)
HDF5DataSetBasic member functions.
void define_mpiHDF5bufferchunk()
std::string get_group()
Report which group in the output file we are targeting.
hid_t hdftype_id
HDF5 type ID for this dataset.
HDF5BufferBase(const std::string &name, const bool sync)
Constructor.
std::vector< HDF5MasterBuffer * > aux_buffers
Vector of pointers to master buffer objects for auxilliary printers Only the primary printer will hav...
virtual int get_type_id() const =0
Retrieve the integer type ID for this dataset.
const Logging::endofmessage EOM
Explicit const instance of the end of message struct in Gambit namespace.
Definition: logger.hpp:99
const int h5v2_BLOCK(30)
bool synchronised
Flag to specify what sort of buffer this manager is supposed to be managing.
Utils::FileLock hdf5out
File locking object for the output hdf5 file.
Logging::LogMaster & logger()
Function to retrieve a reference to the Gambit global log object.
Definition: logger.cpp:95
hid_t closeFile(hid_t file)
Close hdf5 file.
Definition: hdf5tools.cpp:92
hid_t getH5DatasetType(hid_t group_id, const std::string &dset_name)
Get type of an object in a group.
Definition: hdf5tools.cpp:631
const int h5v2_bufname(10)
MPI tags for HDF5 printer v2.
START_MODEL dNur_CMB r
bool is_synchronised()
Report what sort of buffers we are managing.
const std::map< std::string, HDF5BufferBase * > & get_all_buffers()
Retrieve a map containing pointers to all buffers managed by this object.
bool is_open
Flag to let us known if the dataset is open.
std::vector< PPIDpair > buffered_points
Vector of PPIDpairs that are currently stored in the printer buffers This also defines the order in w...
void create_dataset(hid_t location_id)
Create a new dataset at the specified location (implemented in derived class since need to know the t...
bool checkFileReadable(const std::string &fname, std::string &msg)
Check if hdf5 file exists and can be opened in read/write mode.
Definition: hdf5tools.cpp:268
TYPE getValueOrDef(TYPE def, const args &... keys) const
bool synchronised
Flag to tell us whether this buffer should perform block writes to the output dataset, or look up and overwrite existing points.
const int h5v2_BEGIN(31)
std::vector< std::string > lsGroup(hid_t group_id)
List object names in a group.
Definition: hdf5tools.cpp:593
std::string get_filename()
Report name (inc. path) of output file.
std::set< PPIDpair > get_points_set() const
Report all the points in this buffer.
std::vector< std::pair< std::string, int > > get_all_dset_names_on_disk()
Get names and types of all datasets in the group that we are pointed at.
void create_dataset(hid_t location_id)
Create a new dataset at the specified location.
int type_id
Integer identifier for the template type of this dataset (determined by derived type) ...
void update_all_buffers(const PPIDpair &ppid)
Inform all buffers that data has been written to certain mpirank/pointID pair They will make sure tha...
long long values_int[NBUFFERS][SIZE]
unsigned long int ulong
void lock_and_open_file(const char access_type='w')
Open (and lock) output HDF5 file and obtain HDF5 handles.
constexpr int h5v2_type< double >()
Definition: hdf5tools.hpp:286
Base class for buffers.
bool exists_on_disk
Variable tracking whether the dataset is known to exist in the output file yet.
constexpr int h5v2_type< long >()
Definition: hdf5tools.hpp:281
std::size_t get_buffer_length()
Report length of buffer for HDF5 output.
std::string _dset_name
Name of dataset for which this object is the buffer.
EXPORT_SYMBOLS warning & printer_warning()
Printer warnings.
double pow(const double &a)
Outputs a^i.
bool all_buffers_empty()
Report whether all the buffers are empty.
std::string file
Output file variales.
std::string get_groupname()
Report group in output HDF5 file of output datasets.
std::string _myname
Name of the dataset in the hdf5 file.
hid_t openGroup(hid_t file_id, const std::string &name, bool nocreate=false)
Definition: hdf5tools.cpp:512
bool is_synchronised() const
Report whether this buffer is synchronised.
hid_t dset_id
HDF5 dataset identifer.
std::set< T > set_diff(const std::set< T > &set1, const std::set< T > &set2)
constexpr int h5v2_type< float >()
Definition: hdf5tools.hpp:285
std::string buffer_status()
Report status of non-empty buffers (as a string message)
hid_t get_dset_id() const
Retrieve the dataset ID for the currently open dataset.
#define DEFINE_GET_BUFFER(TYPE)
Specialisation declarations for &#39;get_buffer&#39; function for each buffer type.
bool checkGroupReadable(hid_t location, const std::string &groupname, std::string &msg)
Check if a group exists and can be accessed.
Definition: hdf5tools.cpp:302
void ensure_dataset_exists(const hid_t loc_id, const std::size_t length)
Ensure that a correctly named dataset exists at the target location with the specified length...
std::map< ulong, ulong > get_highest_PPIDs(const int mpisize)
Search the existing output and find the highest used pointIDs for each rank.
std::size_t get_next_free_position()
Get next available position in the synchronised output datasets.
void setValue(const KEYTYPE &key, const VALTYPE &val)
Basic setter, for adding extra options.
HDF5MasterBuffer(const std::string &filename, const std::string &groupname, const bool sync, const std::size_t buffer_length)
Constructor.
Base class for interfacing to a HDF5 dataset.
bool isDataSet(hid_t group_id, const std::string &name)
Check if an object in a group is a dataset.
Definition: hdf5tools.cpp:616
A collection of tools for interacting with HDF5 databases.
double get_sizeMB()
Report upper limit estimate of size of all buffer data in MB.
const std::set< PPIDpair > & get_all_points()
Retrieve set containing all points currently known to be in these buffers.
HDF5Printer2 * link_primary_printer(BasePrinter *const primary)
Convert pointer-to-base for primary printer into derived type.
void extend_all_datasets_to(const std::size_t length)
Extend all datasets to the specified size;.
pointID / process number pair Used to identify a single parameter space point
TODO: see if we can use this one:
Definition: Analysis.hpp:33
void flush()
Empty all buffers to disk.
A small wrapper object for &#39;options&#39; nodes.
Class to manage all buffers for a given printer object Also handles the file locking/access to the ou...
std::size_t get_buffer_length()
Report length of buffer for HDF5 output.
virtual ~HDF5DataSetBase()
Destructor.
std::string dset_name() const
Report name of dataset for which we are the buffer.
void extend_dset_to(const std::size_t new_size)
Extend dataset to the specified size, filling it with default values.
std::size_t virtual_dset_length
Variable tracking size of dataset on disk.
int get_type_id() const
Retrieve the integer type ID for this dataset.
void untrack_points(const std::set< PPIDpair > &removed_points)
Remove points from buffer tracking.
std::size_t buffer_length
Max allowed size of buffer.
void extend_dset_by(const std::size_t extend_by)
Set the variable that tracks the (virtual) dataset size on disk.