gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-2191-ga4742ac
a Global And Modular Bsm Inference Tool
hdf5printer_v2.hpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
30 
31 
32 #ifndef __hdf5printer_v2_hpp__
33 #define __hdf5printer_v2_hpp__
34 
35 #include <algorithm>
36 #include <set>
37 #include <iterator>
38 #include <string>
39 
40 // BOOST_PP
41 #include <boost/preprocessor/seq/for_each_i.hpp>
42 
43 // GAMBIT
47 #include "gambit/Utils/cats.hpp"
52 #include "gambit/Logs/logger.hpp"
53 
54 // Activate extra debug logging (warning, LOTS of output)
55 //#define HDF5PRINTER2_DEBUG
56 
57 namespace Gambit
58 {
59  namespace Printers
60  {
61 
62  typedef unsigned int uint;
63  typedef unsigned long ulong;
64  typedef long long longlong;
65  typedef unsigned long long ulonglong;
66 
67  // DEBUG h5v2_BLOCK message counters
68  //static int recv_counter;
69  //static int send_counter;
70 
73  static const std::size_t HDF5_CHUNKLENGTH = 100;
74 
76  static const std::size_t DSETRANK = 1;
77 
79  static const std::size_t MAX_BUFFER_SIZE = 100000;
80 
82  const int h5v2_bufname(10);
83  const int h5v2_bufdata_points(11);
84  const int h5v2_bufdata_ranks(12);
85  const int h5v2_bufdata_valid(13);
86  const int h5v2_bufdata_type(14);
87  const int h5v2_bufdata_values(15);
88  // Message block end tag:
89  // 1 means "there is another block of messages to receive"
90  // 0 means "there are no more blocks of messages to receive"
91  const int h5v2_BLOCK(30);
92  // "Begin sending data" tag
93  const int h5v2_BEGIN(31);
94 
95  // The 'h5v2_bufdata_type' messages send an integer encoding
96  // the datatype for the h5v2_bufdata_values messages
97  // Need a unique integer for each type. We can encode these
98  // with a template function:
99 
100  template<class T>
101  std::set<T> set_diff(const std::set<T>& set1, const std::set<T>& set2)
102  {
103  std::set<T> result;
104  std::set_difference(set1.begin(), set1.end(), set2.begin(), set2.end(),
105  std::inserter(result, result.end()));
106  return result;
107  }
108 
111  {
112  public:
113  HDF5DataSetBase(const std::string& name, const hid_t hdftype_id);
114  HDF5DataSetBase(const std::string& name);
115  virtual ~HDF5DataSetBase();
116 
118  void open_dataset(hid_t location_id);
119 
121  void close_dataset();
122 
125  virtual void create_dataset(hid_t location_id) = 0;
126 
128  std::size_t get_dset_length() const;
129 
131  bool dataset_exists(const hid_t loc_id);
132 
134  void ensure_dataset_exists(const hid_t loc_id, const std::size_t length);
135 
137  void extend_dset_to(const std::size_t new_size);
138 
140  std::string myname() const;
141 
143  int get_type_id() const;
144 
146  hid_t get_hdftype_id() const;
147 
149  bool get_exists_on_disk() const;
150  void set_exists_on_disk();
151 
152  private:
153 
154  // Dataset and chunk dimension specification arrays
155  // We are only using 1D output datasets for simplicity.
156  // Values are only valid if 'is_open==true'
157  hsize_t dims [DSETRANK];
158  hsize_t maxdims [DSETRANK];
159  hsize_t chunkdims[DSETRANK];
160  //hsize_t slicedims[DSETRANK];
161  // Note, dims[0] is current size of dataset, so next unused index is equal to dims[0]
162 
164  std::string _myname;
165 
167  bool is_open;
168 
171 
172  protected:
173 
176 
178  void ensure_dataset_is_open() const;
179 
181  hid_t get_dset_id() const;
182 
184  //void set_dset_length(const std::size_t newsize);
185 
187  void extend_dset_by(const std::size_t extend_by);
188 
190  std::pair<hid_t,hid_t> select_hyperslab(std::size_t offset, std::size_t length) const;
191 
194 
196  int type_id;
197  };
198 
201  {
202  public:
203  HDF5DataSetBasic(const std::string& name);
204  void create_dataset(hid_t location_id);
205  };
206 
208  template<class T>
210  {
211  public:
212 
214  HDF5DataSet(const std::string& name)
215  : HDF5DataSetBase(name,get_hdf5_data_type<T>::type())
216  {}
217 
219  std::size_t write_vector(const hid_t loc_id, const std::vector<T>& data, const std::size_t target_pos, const bool force=false)
220  {
221  open_dataset(loc_id);
222  bool all_data_written=false;
223  T buffer[MAX_BUFFER_SIZE];
224  std::size_t i = 0;
225  std::size_t offset = target_pos;
226  //std::cout<<"Preparing to write "<<data.size()<<" elements to dataset "<<myname()<<" at position "<<target_pos<<std::endl;
227  while(not all_data_written)
228  {
229  std::size_t j;
230  // Copy data into buffer up to MAX_BUFFER_SIZE
231  for(j=0;
232  (j<MAX_BUFFER_SIZE) && (i<data.size());
233  ++j, ++i)
234  {
235  // DEBUG inspect buffer
236  //std::cout<< " buffer["<<j<<"] = data.at("<<i<<") = "<<data.at(i)<<std::endl;
237  buffer[j] = data.at(i);
238  }
239  //std::cout<<" i="<<i<<", j="<<j<<", data.size()="<<data.size()<<", MAX_BUFFER_SIZE="<<MAX_BUFFER_SIZE<<std::endl;
240  // Write buffer to disk
241  //std::cout<<"Writing "<<j<<" elements to dataset "<<myname()<<" at position "<<offset<<std::endl;
242  write_buffer(buffer,j,offset,force);
243  offset += j;
244  if(i==data.size()) all_data_written = true;
245  }
246  std::size_t new_dset_size = get_dset_length();
247  close_dataset();
248 
249  // Report new size of the dataset so that we can check that all datasets are the same length
250  return new_dset_size;
251  }
252 
259  void write_buffer(const T (&buffer)[MAX_BUFFER_SIZE], const std::size_t length, const std::size_t target_pos, const bool force=false)
260  {
261  if(length>MAX_BUFFER_SIZE)
262  {
263  std::ostringstream errmsg;
264  errmsg << "Error! Received buffer with length ("<<length<<") greater than MAX_BUFFER_SIZE ("<<MAX_BUFFER_SIZE<<") while tring to perform block write for dataset (name="<<myname()<<"). The input to this function is therefore invalid.";
265  printer_error().raise(LOCAL_INFO, errmsg.str());
266  }
267  if(length==0)
268  {
269  std::ostringstream errmsg;
270  errmsg << "Error! Received buffer of length zero! This will cause an error when trying to select element for writing, and there is no point calling this function with no points to write anyway. Please review the input to this function (error occurred while tring to perform block write for dataset (name="<<myname()<<"))";
271  printer_error().raise(LOCAL_INFO, errmsg.str());
272  }
273 
274  // DEBUG dump whole buffer up to length to check it
275  //for(std::size_t i=0;i<length;++i)
276  //{
277  // std::cout<<" buffer["<<i<<"] = "<<buffer[i]<<std::endl;
278  //}
279 
281 
282  // Get the C interface identifier for the type of the output dataset
283  hid_t expected_dtype = get_hdftype_id();
284  hid_t dtype = H5Dget_type(get_dset_id()); // type with which the dset was created
285  if(not H5Tequal(dtype, expected_dtype))
286  {
287  std::ostringstream errmsg;
288  errmsg << "Error! Tried to write to dataset (name="<<myname()<<") with type id "<<dtype<<" but expected it to have type id "<<expected_dtype<<". This is a bug, please report it.";
289  printer_error().raise(LOCAL_INFO, errmsg.str());
290  }
291 
292  std::size_t required_size = target_pos+length;
293  // Check that target position is allowed
294  if(target_pos < get_dset_length())
295  {
296  if(force)
297  {
298  if(required_size > get_dset_length())
299  {
300  // Some overlap into unused space, partial dataset extension required.
301  extend_dset_to(required_size);
302  }
303  // Else whole target block is inside current dataset size. No extension required.
304  }
305  else
306  {
307  std::ostringstream errmsg;
308  errmsg << "Error! Tried to write block to dataset (name="<<myname()<<"), but target index ("<<target_pos<<") is inside the current dataset extents (dset size="<<get_dset_length()<<"), i.e. some of the target slots are already used! This is a bug, please report it.";
309  printer_error().raise(LOCAL_INFO, errmsg.str());
310  }
311  }
312  else
313  {
314  // Target block fully outside current dataset extents. Extend to fit.
315  extend_dset_to(required_size);
316  }
317 
318  // Select output hyperslab
319  // (this also determines what data will be read out of the buffer)
320  std::pair<hid_t,hid_t> selection_ids = select_hyperslab(target_pos,length);
321  hid_t memspace_id = selection_ids.first;
322  hid_t dspace_id = selection_ids.second;
323 
324  // Write the data to the hyperslab.
325  herr_t status = H5Dwrite(get_dset_id(), get_hdftype_id(), memspace_id, dspace_id, H5P_DEFAULT, buffer);
326  if(status<0)
327  {
328  std::ostringstream errmsg;
329  errmsg << "Error writing new chunk to dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Dwrite failed." << std::endl;
330  printer_error().raise(LOCAL_INFO, errmsg.str());
331  }
332 
333  // Release the hyperslab IDs
334  H5Sclose(dspace_id);
335  H5Sclose(memspace_id);
336 
337  //std::cout<<"write_buffer finished; new dataset size is: "<<get_dset_length()<<std::endl;
338  }
339 
341  void write_random(const hid_t loc_id, const std::map<std::size_t,T>& data)
342  {
343  open_dataset(loc_id);
344  bool all_data_written=false;
345  T buffer[MAX_BUFFER_SIZE];
346  hsize_t coords[MAX_BUFFER_SIZE];
347  auto it = data.begin();
348  while(not all_data_written)
349  {
350  // Copy data into buffer up to MAX_BUFFER_SIZE
351  std::size_t j;
352  for(j=0;
353  (j<MAX_BUFFER_SIZE) && (it!=data.end());
354  ++j, ++it)
355  {
356  buffer[j] = it->second;
357  coords[j] = it->first;
358  }
359  // Write buffer to disk
360  if(j>0) write_RA_buffer(buffer,coords,j);
361  if(it==data.end()) all_data_written = true;
362  }
363  close_dataset();
364  }
365 
367  void write_RA_buffer(const T (&buffer)[MAX_BUFFER_SIZE], const hsize_t (&coords)[MAX_BUFFER_SIZE], std::size_t npoints)
368  {
369  if(npoints>MAX_BUFFER_SIZE)
370  {
371  std::ostringstream errmsg;
372  errmsg << "Error! Received npoints ("<<npoints<<") greater than MAX_BUFFER_SIZE ("<<MAX_BUFFER_SIZE<<") while tring to perform RA write for dataset (name="<<myname()<<"). The input to this function is therefore invalid.";
373  printer_error().raise(LOCAL_INFO, errmsg.str());
374  }
375  if(npoints==0)
376  {
377  std::ostringstream errmsg;
378  errmsg << "Error! Received npoints=0! This will cause an error when trying to select element for writing, and there is no point calling this function with no points to write anyway. Please review the input to this function (error occurred while tring to perform RA write for dataset (name="<<myname()<<"))";
379  printer_error().raise(LOCAL_INFO, errmsg.str());
380  }
381 
383 
384  bool error_occurred = false; // simple error flag
385 
386  // DEBUG: check coords array
387  //for(std::size_t i=0; i<npoints; i++) std::cout<<"coords["<<i<<"] = "<<coords[i]<<std::endl;
388 
389  // Check that no data is to be written outside the current dataset extents. This
390  // function is only for writing back to points that already exist!
391  std::size_t max_coord = *std::max_element(coords,coords+npoints);
392  if(max_coord > get_dset_length())
393  {
394  std::ostringstream errmsg;
395  errmsg<<"Attempted to perform RA write to a point outside the current dataset extents (max_coord="<<max_coord<<", dset_length="<<get_dset_length()<<")! The dataset should be resized prior to calling this function, so this is a bug, please report it.";
396  printer_error().raise(LOCAL_INFO, errmsg.str());
397  }
398 
399  // Dataset size in memory
400  static const std::size_t MDIM_RANK = 1;
401  hsize_t mdim[] = {npoints};
402 
403  // Dataspace for the output values
404  hid_t dspace = H5Screate_simple(MDIM_RANK, mdim, NULL);
405  if(dspace<0) error_occurred = true;
406 
407  // Get the C interface identifier for a copy of the dataspace
408  // of the dataset
409  hid_t dspace_id = H5Dget_space(get_dset_id());
410  if(dspace_id<0) error_occurred = true;
411 
412  // Select the target write points in the file dataspace
413  hid_t errflag = H5Sselect_elements(dspace_id, H5S_SELECT_SET, npoints, coords);
414  if(errflag<0) error_occurred = true;
415 
416  // Get the C interface identifier for the type of the output dataset
417  hid_t expected_dtype = get_hdftype_id();
418  hid_t dtype = H5Dget_type(get_dset_id()); // type with which the dset was created
419  if(not H5Tequal(dtype, expected_dtype))
420  {
421  std::ostringstream errmsg;
422  errmsg << "Error! Tried to write to dataset (name="<<myname()<<") with type id "<<dtype<<" but expected it to have type id "<<expected_dtype<<". This is a bug, please report it.";
423  printer_error().raise(LOCAL_INFO, errmsg.str());
424  }
425 
426  // Write data to selected points
427  // (H5P_DEFAULT specifies some transfer properties for the I/O
428  // operation. These are the default values, probably are ok.)
429  hid_t errflag2 = H5Dwrite(get_dset_id(), dtype, dspace, dspace_id, H5P_DEFAULT, buffer);
430 
431  if(errflag2<0) error_occurred = true;
432 
433  if(error_occurred)
434  {
435  std::ostringstream errmsg;
436  errmsg << "Error! Failed to write desynchronised buffer data to file! (dataset name="<<myname()<<")"<<std::endl
437  << "Error flags were:" << std::endl
438  << " dspace : " << dspace << std::endl
439  << " dspace_id: " << dspace_id << std::endl
440  << " errflag : " << errflag << std::endl
441  << " errflag2 : " << errflag2 << std::endl
442  << "Variables:" << std::endl
443  << " dtype = " << dtype;
444  printer_error().raise(LOCAL_INFO, errmsg.str());
445  }
446 
447  H5Tclose(dtype);
448  H5Sclose(dspace_id);
449  H5Sclose(dspace);
450  }
451 
453  std::vector<T> get_chunk(std::size_t offset, std::size_t length) const
454  {
455  // Buffer to receive data (and return from function)
456  std::vector<T> chunkdata(length);
457 
458  // Select hyperslab
459  std::pair<hid_t,hid_t> selection_ids = select_hyperslab(offset,length);
460  hid_t memspace_id = selection_ids.first;
461  hid_t dspace_id = selection_ids.second;
462 
463  // Buffer to receive data
464  void* buffer = &chunkdata[0]; // pointer to contiguous memory within the buffer vector
465 
466  // Get the data from the hyperslab.
467  herr_t err_read = H5Dread(get_dset_id(), get_hdftype_id(), memspace_id, dspace_id, H5P_DEFAULT, buffer);
468 
469  if(err_read<0)
470  {
471  std::ostringstream errmsg;
472  errmsg << "Error retrieving chunk (offset="<<offset<<", length="<<length<<") from dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Dread failed." << std::endl;
473  errmsg << " offset+length = "<< offset+length << std::endl;
474  errmsg << " dset_length() = "<< get_dset_length() << std::endl;
475  printer_error().raise(LOCAL_INFO, errmsg.str());
476  }
477 
478  H5Sclose(dspace_id);
479  H5Sclose(memspace_id);
480 
481  return chunkdata;
482  }
483 
487  void reset(hid_t loc_id)
488  {
489  if(dataset_exists(loc_id))
490  {
491  open_dataset(loc_id);
492  std::size_t remaining_length = get_dset_length();
493  close_dataset();
494  std::size_t target_pos = 0;
495  while(remaining_length>0)
496  {
497  std::vector<T> zero_buffer;
498  if(remaining_length>=MAX_BUFFER_SIZE)
499  {
500  zero_buffer = std::vector<T>(MAX_BUFFER_SIZE);
501  remaining_length -= MAX_BUFFER_SIZE;
502  }
503  else
504  {
505  zero_buffer = std::vector<T>(remaining_length);
506  remaining_length = 0;
507  }
508  write_vector(loc_id, zero_buffer, target_pos, true);
509  target_pos += MAX_BUFFER_SIZE;
510  }
511  }
512  // else the dataset doesn't even exist yet (no buffer flushes have occurred yet),
513  // so don't need to reset anything.
514  }
515 
517  void create_dataset(hid_t location_id);
518 
519  };
520 
522  template<class T>
524  {
525  hsize_t dims [DSETRANK];
526  hsize_t maxdims [DSETRANK];
527  hsize_t chunkdims[DSETRANK];
528  //hsize_t slicedims[DSETRANK];
529 
530  // Compute initial dataspace and chunk dimensions
531  dims[0] = 0; // Empty to start
532  maxdims[0] = H5S_UNLIMITED; // No upper limit on number of records allowed in dataset
533  chunkdims[0] = HDF5_CHUNKLENGTH;
534  //slicedims[0] = 1; // Dimensions of a single record in the data space
535 
536  // Create the data space
537  hid_t dspace_id = H5Screate_simple(DSETRANK, dims, maxdims);
538  if(dspace_id<0)
539  {
540  std::ostringstream errmsg;
541  errmsg << "Error creating dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Screate_simple failed.";
542  printer_error().raise(LOCAL_INFO, errmsg.str());
543  }
544 
545  // Object containing dataset creation parameters
546  hid_t cparms_id = H5Pcreate(H5P_DATASET_CREATE);
547  if(cparms_id<0)
548  {
549  std::ostringstream errmsg;
550  errmsg << "Error creating dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Pcreate failed.";
551  printer_error().raise(LOCAL_INFO, errmsg.str());
552  }
553 
554  herr_t status = H5Pset_chunk(cparms_id, DSETRANK, chunkdims);
555  if(status<0)
556  {
557  std::ostringstream errmsg;
558  errmsg << "Error creating dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Pset_chunk failed.";
559  printer_error().raise(LOCAL_INFO, errmsg.str());
560  }
561 
562  // Check if location id is invalid
563  if(location_id==-1)
564  {
565  std::ostringstream errmsg;
566  errmsg << "Error! Tried to create hdf5 dataset (with name=\""<<myname()<<"\") at undefined location (location_id was -1). Please check that calling code supplied a valid location handle. This is a bug, please report it.";
567  printer_error().raise(LOCAL_INFO, errmsg.str());
568  }
569 
570  // Create the dataset
571  hid_t dset_id = H5Dcreate2(location_id, myname().c_str(), get_hdftype_id(), dspace_id, H5P_DEFAULT, cparms_id, H5P_DEFAULT);
572  if(dset_id<0)
573  {
574  std::ostringstream errmsg;
575  errmsg << "Error creating dataset (with name=\""<<myname()<<"\") in HDF5 file. Dataset with same name may already exist";
576  printer_error().raise(LOCAL_INFO, errmsg.str());
577  }
578 
579  // Release the dataspace IDs
580  H5Sclose(dspace_id);
581  H5Pclose(cparms_id);
582  H5Dclose(dset_id);
583 
584  // Register that the dataset now exists on disk
586  }
587 
588 
591  {
592  public:
593 
595  HDF5BufferBase(const std::string& name, const bool sync);
596 
598  virtual ~HDF5BufferBase() {}
599 
601  std::string dset_name() const;
602 
604  virtual bool exists_on_disk() const = 0;
605 
607  virtual void update(const PPIDpair& ppid) = 0;
608 
610  virtual void block_flush(const hid_t loc_id, const std::vector<PPIDpair>& order, const std::size_t target_pos) = 0;
611 
613  virtual void random_flush(const hid_t loc_id, const std::map<PPIDpair,std::size_t>& position_map) = 0;
614 
615  // Retrieve buffer data in specified order (leaving it empty!) along with type ID in
616  // As a double.
617  virtual std::pair<std::vector<double>,std::vector<int>> flush_to_vector_dbl(const std::vector<PPIDpair>& order) = 0;
618  // int version
619  virtual std::pair<std::vector<long>,std::vector<int>> flush_to_vector_int(const std::vector<PPIDpair>& order) = 0;
620 
621 #ifdef WITH_MPI
622  virtual void MPI_flush_to_rank(const unsigned int r) = 0;
624 
625 #endif
626 
628  virtual void ensure_dataset_exists(const hid_t loc_id, const std::size_t length) = 0;
629 
631  virtual void reset(hid_t loc_id) = 0;
632 
633  // Report whether this buffer is synchronised
634  bool is_synchronised() const;
635 
636  // Report the number of items currently in the buffer;
637  virtual std::size_t N_items_in_buffer() = 0;
638 
640  std::set<PPIDpair> get_points_set() const;
641 
643  virtual int get_type_id() const = 0;
644 
645  private:
646 
648  std::string _dset_name;
649 
653 
654  protected:
655 
657  std::set<PPIDpair> buffer_set;
658 
660  std::map<PPIDpair,int> buffer_valid;
661  };
662 
664  template<class T>
666  {
667  public:
668 
670  HDF5Buffer(const std::string& name, const bool sync, const std::vector<PPIDpair>& buffered_points
671 #ifdef WITH_MPI
672  // Gambit MPI communicator context for use within the hdf5 printer system
673  , GMPI::Comm& comm
674 #endif
675  )
676  : HDF5BufferBase(name,sync)
677  , my_dataset(name)
678  , my_dataset_valid(name+"_isvalid")
679 #ifdef WITH_MPI
680  , myComm(comm)
681 #endif
682  {
683  // Add points known to other buffers (as 'invalid' data, for synchronisation purposes)
684  for(auto it=buffered_points.begin(); it!=buffered_points.end(); ++it)
685  {
686  update(*it);
687  }
688  }
689 
691  void update(const PPIDpair& ppid)
692  {
693  buffer[ppid]; // Create point with default value if it doesn't exist
694  buffer_set.insert(ppid);
695  auto it = buffer_valid.find(ppid);
696  // If point not already in the buffer, set it as invalid
697  if(it==buffer_valid.end())
698  {
699  buffer_valid[ppid] = 0;
700  // DEBUG
701  //std::cout<<"Set point "<<ppid<<" to 'invalid' for buffer "<<dset_name()<<std::endl;
702  }
703  }
704 
706  void append(T const& value, const PPIDpair& ppid)
707  {
708  buffer [ppid] = value;
709  buffer_valid[ppid] = 1;
710  buffer_set.insert(ppid);
711  //logger()<<" ***Added valid data "<<value<<" to point "<<ppid<<" in buffer "<<dset_name()<<std::endl;
712  }
713 
716  void block_flush(const hid_t loc_id, const std::vector<PPIDpair>& order, const std::size_t target_pos)
717  {
718  // Make sure output order is same size as the buffer to be output
719  if(order.size() != buffer.size())
720  {
721  std::ostringstream errmsg;
722  errmsg << "Supplied buffer ordering vector is not the same size as the buffer (buffer.size()="<<buffer.size()<<", order.size()="<<order.size()<<"; dset_name()="<<dset_name()<<"). This is a bug, please report it." <<std::endl;
723  errmsg << "Extra debug information:" << std::endl;
724  errmsg << " buffer.size() = "<<buffer.size()<<std::endl;
725  errmsg << " buffer_valid.size() = "<<buffer_valid.size()<<std::endl;
726  errmsg << " buffer_set.size() = "<<buffer_set.size()<<std::endl;
727  printer_error().raise(LOCAL_INFO, errmsg.str());
728  }
729 
730  // Need to keep track of whether buffer points have been added to the ordered output
731  std::set<PPIDpair> done;
732 
733  // Create a vector version of the buffer in the specified order
734  std::vector<T> ordered_buffer;
735  std::vector<int> ordered_buffer_valid;
736  for(auto ppid_it=order.begin(); ppid_it!=order.end(); ++ppid_it)
737  {
738  if(done.count(*ppid_it)!=0)
739  {
740  std::ostringstream errmsg;
741  errmsg << "Supplied buffer ordering vector contains a duplicate PPIDpair! This is a bug, please report it.";
742  printer_error().raise(LOCAL_INFO, errmsg.str());
743  }
744  ordered_buffer .push_back(buffer .at(*ppid_it));
745  ordered_buffer_valid.push_back(buffer_valid.at(*ppid_it));
746  done.insert(*ppid_it);
747  }
748 
749  // Check if any points were not added to the ordered buffer
750  std::set<PPIDpair> not_done = set_diff(buffer_set,done);
751 
752  if(not_done.size()>0)
753  {
754  std::ostringstream errmsg;
755  errmsg << "Supplied buffer ordering vector does not specify order positions for all points in the buffer! This is a bug, please report it.";
756  printer_error().raise(LOCAL_INFO, errmsg.str());
757  }
758 
759  if(ordered_buffer.size() != buffer.size())
760  {
761  std::ostringstream errmsg;
762  errmsg << "The ordered buffer we just constructed is not the same size as the original buffer! This is a bug, please report it.";
763  printer_error().raise(LOCAL_INFO, errmsg.str());
764  }
765 
766  // Perform dataset writes
767  #ifdef HDF5PRINTER2_DEBUG
769  logger()<<"Writing block of data to disk for dataset "<<dset_name()<<std::endl;
770  logger()<<" Data to write (to target_pos="<<target_pos<<"):"<<std::endl;
771  for(auto it=ordered_buffer.begin(); it!=ordered_buffer.end(); ++it)
772  {
773  logger()<<" "<<*it<<std::endl;
774  }
775  logger()<<EOM;
776  #endif
777 
778  std::size_t newsize = my_dataset .write_vector(loc_id,ordered_buffer ,target_pos);
779  std::size_t newsize_v = my_dataset_valid.write_vector(loc_id,ordered_buffer_valid,target_pos);
780  if(newsize!=newsize_v)
781  {
782  std::ostringstream errmsg;
783  errmsg<<"Inconsistent dataset sizes detected after buffer flush! (newsize="<<newsize<<", newsize_v="<<newsize_v<<")";
784  printer_error().raise(LOCAL_INFO, errmsg.str());
785  }
786 
787  // Clear buffer variables
788  buffer .clear();
789  buffer_valid.clear();
790  buffer_set.clear();
791  }
792 
796  void random_flush(const hid_t loc_id, const std::map<PPIDpair,std::size_t>& position_map)
797  {
798  std::map<std::size_t,T> pos_buffer;
799  std::map<std::size_t,int> pos_buffer_valid;
800 
801  // DEBUG inspect buffer
802  //for(auto it=buffer.begin(); it!=buffer.end(); ++it)
803  //{
804  // std::cout<<"buffer["<<it->first<<"] = "<<it->second<<std::endl;
805  //}
806 
807  for(auto it=position_map.begin(); it!=position_map.end(); ++it)
808  {
809  const PPIDpair& ppid = it->first;
810  const std::size_t& position = it->second;
811  auto bit = buffer .find(ppid);
812  auto vit = buffer_valid.find(ppid);
813  if(bit==buffer.end() or vit==buffer_valid.end())
814  {
815  std::ostringstream errmsg;
816  errmsg<<"Could not find point "<<ppid<<" in buffer! This is a bug, please report it."<<std::endl;
817  printer_error().raise(LOCAL_INFO, errmsg.str());
818  }
819  if(vit->second) // I think there is no reason to write the RA data to disk if it is invalid. Buffers should have been reset if need to clear points.
820  {
821  pos_buffer [position] = bit->second;
822  pos_buffer_valid[position] = vit->second;
823  }
824  // Erase point from buffer
825  buffer .erase(bit);
826  buffer_valid.erase(vit);
827  buffer_set.erase(ppid);
828  }
829  // Perform dataset writes
830  my_dataset .write_random(loc_id, pos_buffer );
831  my_dataset_valid.write_random(loc_id, pos_buffer_valid);
832  }
833 
836  void reset(hid_t loc_id)
837  {
838  if(not is_synchronised())
839  {
840  // Only need to clear the "validity" dataset
841  // Doesn't matter what values are in the main datasets
842  // once they are marked as 'invalid'.
843  buffer .clear();
844  buffer_valid.clear();
845  buffer_set.clear();
846  //my_dataset .reset(loc_id);
847  my_dataset_valid.reset(loc_id);
848  }
849  else
850  {
851  std::ostringstream errmsg;
852  errmsg<<"Reset called on buffer for data label "<<dset_name()<<", however this output stream is marked as 'synchronised'. It therefore cannot be reset! This is a bug, please report it.";
853  printer_error().raise(LOCAL_INFO, errmsg.str());
854  }
855  }
856 
858  void ensure_dataset_exists(const hid_t loc_id, const std::size_t length)
859  {
860  my_dataset .ensure_dataset_exists(loc_id,length);
861  my_dataset_valid.ensure_dataset_exists(loc_id,length);
862  }
863 
865  bool exists_on_disk() const
866  {
867  return my_dataset.get_exists_on_disk();
868  // TODO: Should make sure that 'valid' dataset also exists on disk
869  }
870 
871  // Report the number of items currently in the buffer;
872  std::size_t N_items_in_buffer()
873  {
875  if( buffer.size()!=buffer_set.size()
876  or buffer.size()!=buffer_valid.size())
877  {
878  std::ostringstream errmsg;
879  errmsg<<"Internal inconsistency detected in buffer for dataset "<<dset_name()<<"; the following variables should all be the same size, but are not:"<<std::endl;
880  errmsg<<" buffer .size() = "<<buffer .size()<<std::endl;
881  errmsg<<" buffer_valid.size() = "<<buffer_valid.size()<<std::endl;
882  errmsg<<" buffer_set .size() = "<<buffer_set .size()<<std::endl;
883  printer_error().raise(LOCAL_INFO, errmsg.str());
884  }
885  return buffer.size();
886  }
887 
888 #ifdef WITH_MPI
889  // Send buffer contents to a different process
890  void MPI_flush_to_rank(const unsigned int r)
891  {
892  if(buffer.size()>0)
893  {
894  // Get name of the dataset this buffer is associated with
895  std::string namebuf = dset_name();
896  // Copy point data and values into MPI send buffer
897  std::vector<unsigned long> pointIDs;
898  std::vector<unsigned int> ranks; // Will assume all PPIDpairs are valid. I think this is fine to do...
899  std::vector<int> valid; // We have to send the invalid points too, to maintain buffer synchronicity
900  std::vector<T> values;
901  int type(h5v2_type<T>()); // Get integer identifying the type of the data values
902  int more_buffers = 1; // Flag indicating that there is a block of data to receive
903  for(auto it=buffer.begin(); it!=buffer.end(); ++it)
904  {
905  pointIDs.push_back(it->first.pointID);
906  ranks .push_back(it->first.rank);
907  valid .push_back(buffer_valid.at(it->first));
908  values .push_back(it->second);
909  }
910 
911  // Debug info
912  #ifdef HDF5PRINTER2_DEBUG
913  logger()<<LogTags::printers<<LogTags::debug<<"Sending points for buffer "<<dset_name()<<std::endl
914  <<" (more_buffers: "<<more_buffers<<")"<<std::endl;
915  for(std::size_t i=0; i<buffer.size(); ++i)
916  {
917  logger()<<" Sending point ("<<ranks.at(i)<<", "<<pointIDs.at(i)<<")="<<values.at(i)<<" (valid="<<valid.at(i)<<")"<<std::endl;
918  }
919  logger()<<EOM;
920  #endif
921 
922  // Send the buffers
923  std::size_t Npoints = values.size();
924  myComm.Send(&more_buffers, 1, r, h5v2_BLOCK);
925  //std::cerr<<myComm.Get_rank()<<": sent "<<more_buffers<<std::endl;
926  //send_counter+=1;
927  myComm.Send(&namebuf[0] , namebuf.size(), MPI_CHAR, r, h5v2_bufname);
928  myComm.Send(&type , 1 , r, h5v2_bufdata_type);
929  myComm.Send(&values[0] , Npoints, r, h5v2_bufdata_values);
930  myComm.Send(&pointIDs[0], Npoints, r, h5v2_bufdata_points);
931  myComm.Send(&ranks[0] , Npoints, r, h5v2_bufdata_ranks);
932  myComm.Send(&valid[0] , Npoints, r, h5v2_bufdata_valid);
933 
934  // Clear buffer variables
935  buffer .clear();
936  buffer_valid.clear();
937  buffer_set .clear();
938  }
939  }
940 
941  // Receive buffer contents from a different process
942  // (MasterBuffer should have received the name, type, and size of the
943  // buffer data, and used this to construct/retrieve this buffer.
944  // We then collect the buffer data messages)
945  void MPI_recv_from_rank(unsigned int r, std::size_t Npoints)
946  {
948  std::vector<unsigned long> pointIDs(Npoints);
949  std::vector<unsigned int> ranks(Npoints);
950  std::vector<int> valid(Npoints);
951  std::vector<T> values(Npoints);
952 
953  // Receive buffer data
954  myComm.Recv(&values[0] , Npoints, r, h5v2_bufdata_values);
955  myComm.Recv(&pointIDs[0], Npoints, r, h5v2_bufdata_points);
956  myComm.Recv(&ranks[0] , Npoints, r, h5v2_bufdata_ranks);
957  myComm.Recv(&valid[0] , Npoints, r, h5v2_bufdata_valid);
958 
959  // Pack it into this buffer
960  #ifdef HDF5PRINTER2_DEBUG
961  logger()<<LogTags::printers<<LogTags::debug<<"Adding points to buffer "<<dset_name()<<std::endl;
962  for(std::size_t i=0; i<Npoints; ++i)
963  {
964  // Extra Debug
965  logger()<<" Adding received point ("<<ranks.at(i)<<", "<<pointIDs.at(i)<<")="<<values.at(i)<<" (valid="<<valid.at(i)<<")"<<std::endl;
966  PPIDpair ppid(pointIDs.at(i), ranks.at(i));
967  if(valid.at(i))
968  {
969  append(values.at(i), ppid);
970  }
971  else
972  {
973  update(ppid);
974  }
975  }
976  logger()<<EOM;
977  #endif
978 
979  // Debug info:
980  //std::cout<<"(rank "<<myComm.Get_rank()<<") Final buffer size: "<<N_items_in_buffer()<<" (Npoints was: "<<Npoints<<"), dset="<<dset_name()<<std::endl;
981  }
982 #endif
983 
984  void add_float_block(const HDF5bufferchunk& chunk, const std::size_t buf)
985  {
986  // Pack it into this buffer
987  #ifdef HDF5PRINTER2_DEBUG
988  logger()<<LogTags::printers<<LogTags::debug<<"Adding 'float type' points to buffer "<<dset_name()<<std::endl;
989  #endif
990  for(std::size_t i=0; i<chunk.used_size; ++i)
991  {
992  bool valid = chunk.valid[buf][i];
993  PPIDpair ppid(chunk.pointIDs[i], chunk.ranks[i]);
994  if(valid)
995  {
996  T value = static_cast<T>(chunk.values[buf][i]);
997  #ifdef HDF5PRINTER2_DEBUG
998  logger()<<" Adding valid point (rank="<<chunk.ranks[i]<<", pointID="<<chunk.pointIDs[i]<<", value="<<value<<")"<<std::endl;
999  #endif
1000  append(value, ppid);
1001  }
1002  else
1003  {
1004  #ifdef HDF5PRINTER2_DEBUG
1005  logger()<<" Updating with invalid point (rank="<<chunk.ranks[i]<<", pointID="<<chunk.pointIDs[i]<<")"<<std::endl;
1006  #endif
1007  update(ppid);
1008  }
1009  }
1010  #ifdef HDF5PRINTER2_DEBUG
1011  logger()<<EOM;
1012  #endif
1013  }
1014 
1015  void add_int_block(const HDF5bufferchunk& chunk, const std::size_t buf)
1016  {
1017  // Pack it into this buffer
1018  #ifdef HDF5PRINTER2_DEBUG
1019  logger()<<LogTags::printers<<LogTags::debug<<"Adding 'int type' points (from chunk["<<buf<<"] with name ID "<<chunk.name_id[buf]<<") to buffer "<<dset_name()<<std::endl;
1020  #endif
1021  for(std::size_t i=0; i<chunk.used_size; ++i)
1022  {
1023  bool valid = chunk.valid[buf][i];
1024  PPIDpair ppid(chunk.pointIDs[i], chunk.ranks[i]);
1025  if(valid)
1026  {
1027  T value = static_cast<T>(chunk.values_int[buf][i]);
1028  #ifdef HDF5PRINTER2_DEBUG
1029  logger()<<" Adding valid point (rank="<<chunk.ranks[i]<<", pointID="<<chunk.pointIDs[i]<<", value="<<value<<")"<<std::endl;
1030  #endif
1031  append(value, ppid);
1032  }
1033  else
1034  {
1035  #ifdef HDF5PRINTER2_DEBUG
1036  logger()<<" Updating with invalid point (rank="<<chunk.ranks[i]<<", pointID="<<chunk.pointIDs[i]<<")"<<std::endl;
1037  #endif
1038  update(ppid);
1039  }
1040  }
1041  #ifdef HDF5PRINTER2_DEBUG
1042  logger()<<EOM;
1043  #endif
1044 
1045  // // Super debug; check entire buffer contents
1046  // logger()<<LogTags::printers<<LogTags::debug;
1047  // logger()<<"Checking buffer contents for dataset "<<dset_name()<<std::endl;
1048  // for(auto it=buffer.begin(); it!=buffer.end(); ++it)
1049  // {
1050  // logger()<<" "<<it->first<<", "<<it->second<<std::endl;
1051  // }
1052  // logger()<<EOM;
1053 
1054  }
1055 
1056  // Retrieve buffer data in specified order (removing the points specified in 'order' from the buffer)
1057  // Points not in the buffer are returned as "invalid"
1058  // As a double.
1059  std::pair<std::vector<double>,std::vector<int>> flush_to_vector_dbl(const std::vector<PPIDpair>& order)
1060  {
1061  std::vector<double> out_values;
1062  std::vector<int> out_valid;
1063  for(auto it=order.begin(); it!=order.end(); ++it)
1064  {
1065  if(buffer_set.find(*it)!=buffer_set.end())
1066  {
1067  // Add to output vector
1068  out_values.push_back((double)buffer.at(*it));
1069  out_valid .push_back(buffer_valid.at(*it));
1070  // Remove from buffer
1071  buffer_set .erase(*it);
1072  buffer .erase(*it);
1073  buffer_valid.erase(*it);
1074  }
1075  else
1076  {
1077  out_values.push_back(0);
1078  out_valid .push_back(0);
1079  }
1080  }
1081  return std::make_pair(out_values,out_valid);
1082  }
1083 
1084  // int version
1085  std::pair<std::vector<long>,std::vector<int>> flush_to_vector_int(const std::vector<PPIDpair>& order)
1086  {
1087  std::vector<long> out_values;
1088  std::vector<int> out_valid;
1089  for(auto it=order.begin(); it!=order.end(); ++it)
1090  {
1091  if(buffer_set.find(*it)!=buffer_set.end())
1092  {
1093  // Add to output vector
1094  out_values.push_back((long)buffer.at(*it));
1095  out_valid .push_back(buffer_valid.at(*it));
1096  // Remove from buffer
1097  buffer_set .erase(*it);
1098  buffer .erase(*it);
1099  buffer_valid.erase(*it);
1100  }
1101  else
1102  {
1103  out_values.push_back(0);
1104  out_valid .push_back(0);
1105  }
1106  }
1107  return std::make_pair(out_values,out_valid);
1108  }
1109 
1111  int get_type_id() const
1112  {
1113  return my_dataset.get_type_id();
1114  }
1115 
1116  private:
1117 
1121 
1123  std::map<PPIDpair,T> buffer;
1124 
1125 #ifdef WITH_MPI
1126  // Gambit MPI communicator context for use within the hdf5 printer system
1127  GMPI::Comm& myComm;
1128 #endif
1129  };
1130 
1132  template<class T>
1134  {
1135  public:
1136 
1139 #ifdef WITH_MPI
1140  , GMPI::Comm& comm
1141 #endif
1142  ) : synchronised(sync)
1143 #ifdef WITH_MPI
1144  , myComm(comm)
1145 #endif
1146  {}
1147 
1149  // Currently buffered points need to be supplied in case we have to create and fill a new buffer
1150  HDF5Buffer<T>& get_buffer(const std::string& label, const std::vector<PPIDpair>& buffered_points)
1151  {
1152  auto it=my_buffers.find(label);
1153  if(it==my_buffers.end())
1154  {
1155  // No buffer with this name. Need to create one!
1156  my_buffers.emplace(label,HDF5Buffer<T>(label,synchronised,buffered_points
1157 #ifdef WITH_MPI
1158  , myComm
1159 #endif
1160  ));
1161  it=my_buffers.find(label);
1162  }
1163  return it->second;
1164  }
1165 
1166  private:
1167 
1168  std::map<std::string,HDF5Buffer<T>> my_buffers;
1170 #ifdef WITH_MPI
1171  // Gambit MPI communicator context for use within the hdf5 printer system
1172  GMPI::Comm& myComm;
1173 #endif
1174  };
1175 
1179  {
1180 
1181  public:
1182 
1184  HDF5MasterBuffer(const std::string& filename, const std::string& groupname, const bool sync, const std::size_t buffer_length
1185 #ifdef WITH_MPI
1186  , GMPI::Comm& comm
1187 #endif
1188  );
1189 
1191  ~HDF5MasterBuffer();
1192 
1194  template<class T>
1195  void schedule_print(T const& value, const std::string& label, const unsigned int mpirank, const unsigned long pointID)
1196  {
1198  PPIDpair thispoint(pointID,mpirank);
1199  auto it = buffered_points_set.find(thispoint);
1200  if(it==buffered_points_set.end())
1201  {
1203  if(buffered_points.size() != buffered_points_set.size())
1204  {
1205  std::stringstream msg;
1206  msg<<"Inconsistency detected between buffered_points and buffered_points_set sizes ("<<buffered_points.size()<<" vs "<<buffered_points_set.size()<<")! This is a bug, please report it."<<std::endl;
1207  printer_error().raise(LOCAL_INFO,msg.str());
1208  }
1209 
1211  if(is_synchronised() and buffered_points.size()>get_buffer_length())
1212  {
1214  std::stringstream msg;
1215  msg<<"The allowed sync buffer size has somehow been exceeded! Buffers should have been flushed when they were full. This is a bug, please report it.";
1216  printer_error().raise(LOCAL_INFO,msg.str());
1217  }
1218  else if(buffered_points.size()==get_buffer_length())
1219  {
1220  // Buffer full, flush it out
1221  flush();
1222  }
1223  else if(not is_synchronised() and buffered_points.size()>get_buffer_length())
1224  {
1226 
1228  if((buffered_points.size()%1000)==0)
1229  {
1230  flush();
1231 
1232  std::stringstream msg;
1233  msg<<"The number of unflushable points in the non-synchronised print buffers is getting large (current buffer length is "<<buffered_points.size()<<"; soft max limit was "<<get_buffer_length()<<"). This may indicate that some process has not been properly printing the synchronised points that it is computing. If nothing changes this process may run out of RAM for the printer buffers and crash.";
1234  printer_warning().raise(LOCAL_INFO,msg.str());
1235  }
1236  }
1237 
1238  // Inform all buffers of this new point
1239  update_all_buffers(thispoint);
1240  // DEBUG
1241  //std::cout<<"Adding point to buffered_points list: "<<thispoint<<std::endl;
1242  buffered_points.push_back(thispoint);
1243  buffered_points_set.insert(thispoint);
1244  }
1245 
1246  // Add the new data to the buffer
1247  get_buffer<T>(label,buffered_points).append(value,thispoint);
1248  }
1249 
1251  void flush();
1252 
1253  #ifdef WITH_MPI
1254  void MPI_flush_to_rank(const unsigned int rank);
1257 
1259  void MPI_request_buffer_data(const unsigned int rank);
1260 
1262  void MPI_recv_all_buffers(const unsigned int rank);
1263 
1266  template<class T>
1267  int MPI_recv_buffer(const unsigned int r, const std::string& dset_name)
1268  {
1269  // Get number of points to be received
1270  MPI_Status status;
1271  myComm.Probe(r, h5v2_bufdata_values, &status);
1272  int Npoints;
1273  int err = MPI_Get_count(&status, GMPI::get_mpi_data_type<T>::type(), &Npoints);
1274  if(err<0)
1275  {
1276  std::stringstream msg;
1277  msg<<"Error from MPI_Get_count while attempting to receive buffer data from rank "<<r<<" for dataset "<<dset_name<<"!";
1278  printer_error().raise(LOCAL_INFO,msg.str());
1279  }
1280  HDF5Buffer<T>& buffer = get_buffer<T>(dset_name, buffered_points);
1281  //std::cout<<"(rank "<<myComm.Get_rank()<<") Npoints: "<<Npoints<<std::endl;
1282  buffer.MPI_recv_from_rank(r, Npoints);
1283  logger()<< LogTags::printers << LogTags::debug << "Received "<<Npoints<<" points from rank "<<r<<"'s buffers (for dataset: "<<dset_name<<")"<<EOM;
1284  //std::cout<<"(rank "<<myComm.Get_rank()<<") Received "<<Npoints<<" from rank "<<r<<". New buffer size is "<<buffer.N_items_in_buffer()<<" (name="<<buffer.dset_name()<<")"<<std::endl;
1285  return Npoints;
1286  }
1287 
1289  template<class T>
1290  void MPI_add_int_block_to_buffer(const HDF5bufferchunk& chunk, const std::string& dset_name, const std::size_t dset_index)
1291  {
1292  HDF5Buffer<T>& buffer = get_buffer<T>(dset_name, buffered_points);
1293  buffer.add_int_block(chunk,dset_index);
1294  }
1295 
1296  template<class T>
1297  void MPI_add_float_block_to_buffer(const HDF5bufferchunk& chunk, const std::string& dset_name, const std::size_t dset_index)
1298  {
1299  HDF5Buffer<T>& buffer = get_buffer<T>(dset_name, buffered_points);
1300  buffer.add_float_block(chunk,dset_index);
1301  }
1302 
1303  // Add a vector of buffer chunk data to the buffers managed by this object
1304  void add_to_buffers(const std::vector<HDF5bufferchunk>& blocks, const std::vector<std::pair<std::string,int>>& buf_types);
1305 
1306  #endif
1307 
1309  void reset();
1310 
1312  void resynchronise();
1313 
1315  bool all_buffers_empty();
1316 
1318  bool is_synchronised();
1319 
1321  std::string buffer_status();
1322 
1324  std::string get_file();
1325 
1327  std::string get_group();
1328 
1330  std::size_t get_buffer_length();
1331 
1333  std::size_t get_Npoints();
1334 
1336  void extend_all_datasets_to(const std::size_t length);
1337 
1339  std::map<ulong, ulong> get_highest_PPIDs(const int mpisize);
1340 
1342  void lock_and_open_file(const char access_type='w'); // read/write allowed by default
1343 
1345  void close_and_unlock_file();
1346 
1348  hid_t get_location_id();
1349 
1351  std::size_t get_next_free_position();
1352 
1354  std::size_t get_Nbuffers();
1355 
1357  double get_sizeMB();
1358 
1360  std::vector<std::pair<std::string,int>> get_all_dset_names_on_disk();
1361 
1363  const std::map<std::string,HDF5BufferBase*>& get_all_buffers();
1364 
1366  const std::set<PPIDpair>& get_all_points();
1367 
1369  // (only intended to be used when points have been removed from buffers by e.g. MPI-related
1370  // routines like flush_to_vector)
1371  void untrack_points(const std::set<PPIDpair>& removed_points);
1372 
1373  private:
1374 
1376  std::map<std::string,HDF5BufferBase*> all_buffers;
1377 
1382  std::vector<PPIDpair> buffered_points;
1383 
1386  std::set<PPIDpair> buffered_points_set;
1387 
1390 
1392  std::size_t buffer_length;
1393 
1395  template<class T>
1396  HDF5Buffer<T>& get_buffer(const std::string& label, const std::vector<PPIDpair>& buffered_points);
1397 
1399  void update_buffer_map(const std::string& label, HDF5BufferBase& buff);
1400 
1404  void update_all_buffers(const PPIDpair& ppid);
1405 
1407  std::map<PPIDpair,std::size_t> get_position_map(const std::vector<PPIDpair>& buffer) const;
1408 
1410  std::string file; // Output HDF5 file
1411  std::string group; // HDF5 group location to store datasets
1412 
1413  // Handles for HDF5 files and groups containing the datasets
1416 
1417  // Handle to a location in a HDF5 to which the datasets will be written
1418  // i.e. a file or a group.
1420 
1422  Utils::FileLock hdf5out;
1423 
1424  // Flag to register whether HDF5 file is open
1426 
1427  // Flag to register whether HDF5 file is locked for us to use
1429 
1431  void ensure_file_is_open() const;
1432 
1434  // Need a map for every directly printable type, and a specialisation
1435  // of 'get_buffer' to access it.
1440  //HDF5MasterBufferT<longlong > hdf5_buffers_longlong;
1441  //HDF5MasterBufferT<ulonglong> hdf5_buffers_ulonglong;
1444 
1445 #ifdef WITH_MPI
1446  // Gambit MPI communicator context for use within the hdf5 printer system
1447  GMPI::Comm& myComm;
1448 #endif
1449 
1450  };
1451 
1453  template<> HDF5Buffer<int >& HDF5MasterBuffer::get_buffer<int >(const std::string& label, const std::vector<PPIDpair>&);
1454  template<> HDF5Buffer<uint >& HDF5MasterBuffer::get_buffer<uint >(const std::string& label, const std::vector<PPIDpair>&);
1455  template<> HDF5Buffer<long >& HDF5MasterBuffer::get_buffer<long >(const std::string& label, const std::vector<PPIDpair>&);
1456  template<> HDF5Buffer<ulong >& HDF5MasterBuffer::get_buffer<ulong >(const std::string& label, const std::vector<PPIDpair>&);
1457  //template<> HDF5Buffer<longlong >& HDF5MasterBuffer::get_buffer<longlong >(const std::string& label, const std::vector<PPIDpair>&);
1458  //template<> HDF5Buffer<ulonglong>& HDF5MasterBuffer::get_buffer<ulonglong>(const std::string& label, const std::vector<PPIDpair>&);
1459  template<> HDF5Buffer<float >& HDF5MasterBuffer::get_buffer<float >(const std::string& label, const std::vector<PPIDpair>&);
1460  template<> HDF5Buffer<double >& HDF5MasterBuffer::get_buffer<double >(const std::string& label, const std::vector<PPIDpair>&);
1461 
1463  class HDF5Printer2: public BasePrinter
1464  {
1465  public:
1467  HDF5Printer2(const Options& options, BasePrinter* const primary = NULL);
1468 
1470  ~HDF5Printer2();
1471 
1473  std::string get_filename();
1474 
1476  std::string get_groupname();
1477 
1479  std::size_t get_buffer_length();
1480 
1484 
1485  // Initialisation function
1486  // Run by dependency resolver, which supplies the functors with a vector of VertexIDs whose requiresPrinting flags are set to true.
1487  void initialise(const std::vector<int>&);
1488  void flush();
1489  void reset(bool force=false);
1490  void finalise(bool abnormal=false);
1491 
1492  // Get options required to construct a reader object that can read
1493  // the previous output of this printer.
1494  Options resume_reader_options();
1495 
1497 
1499  using BasePrinter::_print; // Tell compiler we are using some of the base class overloads of this on purpose.
1500  #define DECLARE_PRINT(r,data,i,elem) void _print(elem const&, const std::string&, const int, const uint, const ulong);
1502  #ifndef SCANNER_STANDALONE
1504  #endif
1505  #undef DECLARE_PRINT
1506 
1509  void add_aux_buffer(HDF5MasterBuffer& aux_buffermaster);
1510 
1513  HDF5Printer2* get_HDF5_primary_printer();
1514 
1515 #ifdef WITH_MPI
1516  GMPI::Comm& get_Comm();
1518 #endif
1519 
1520  private:
1521 
1523  HDF5Printer2* link_primary_printer(BasePrinter* const primary);
1524 
1527 
1528  std::size_t myRank;
1529  std::size_t mpiSize;
1530 
1531 #ifdef WITH_MPI
1532  GMPI::Comm myComm; // initially attaches to MPI_COMM_WORLD
1534 
1536  std::pair<std::map<std::string,int>,std::vector<std::pair<std::string,int>>> get_buffer_idcodes(const std::vector<HDF5MasterBuffer*>& masterbuffers);
1537 
1539  void gather_and_print(HDF5MasterBuffer& out_printbuffer, const std::vector<HDF5MasterBuffer*>& masterbuffers, bool sync);
1540 
1541  // Gather (via MPI) all HDF5 buffer chunk data from a set of managed buffers
1542  std::vector<HDF5bufferchunk> gather_all(GMPI::Comm& comm, const std::vector<HDF5MasterBuffer*>& masterbuffers, const std::map<std::string,int>& buf_ids);
1543 
1544  static constexpr double RAMlimit = 500; // MB; dump data if buffer size exceeds this
1545  static constexpr std::size_t MAXrecv = 100; // Maximum number of processes to send buffer data at one time
1546 
1547 #endif
1548 
1551 
1554  std::vector<HDF5MasterBuffer*> aux_buffers;
1555 
1557  std::string get_filename(const Options& options);
1558 
1560  std::string get_groupname(const Options& options);
1561 
1563  std::size_t get_buffer_length(const Options& options);
1564 
1566  std::map<ulong, ulong> get_highest_PPIDs_from_HDF5();
1567 
1570  void check_consistency(bool attempt_repair);
1571 
1573  bool get_sync(const Options& options);
1574 
1576  // Used to reduce repetition in definitions of virtual function overloads
1577  // (useful since there is no automatic type conversion possible)
1578  template<class T>
1579  void basic_print(T const& value, const std::string& label, const unsigned int mpirank, const unsigned long pointID)
1580  {
1581  // Forward the print information on to the master buffer manager object
1582  buffermaster.schedule_print<T>(value,label,mpirank,pointID);
1583  }
1584 
1585  };
1586 
1587  // Register printer so it can be constructed via inifile instructions
1588  // First argument is string label for inifile access, second is class from which to construct printer
1589  LOAD_PRINTER(hdf5, HDF5Printer2)
1590 
1591  }
1592 }
1593 
1594 #endif
void ensure_dataset_is_open() const
Enforce that the dataset must be open for whatever follows (or else an error is thrown) ...
void schedule_print(T const &value, const std::string &label, const unsigned int mpirank, const unsigned long pointID)
Queue up data to be written to disk when buffers are full.
std::size_t write_vector(const hid_t loc_id, const std::vector< T > &data, const std::size_t target_pos, const bool force=false)
Write a vector of data to disk at the target position.
void add_int_block(const HDF5bufferchunk &chunk, const std::size_t buf)
Class to manage buffer for a single output label.
greatScanData data
Definition: great.cpp:38
HDF5MasterBufferT(bool sync)
Constructor.
The main printer class for output to HDF5 format.
void close_dataset()
Close dataset on disk and release handles.
void reset(hid_t loc_id)
Clear all data in the buffer and on disk Only allowed for "random access" buffers.
EXPORT_SYMBOLS error & printer_error()
Printer errors.
HDF5Printer2 * primary_printer
Pointer to primary printer object.
void basic_print(T const &value, const std::string &label, const unsigned int mpirank, const unsigned long pointID)
Helper print function.
void open_dataset(hid_t location_id)
Open dataset on disk and obtain HDF5 handles.
HDF5DataSet< int > my_dataset_valid
std::string myname() const
Retrieve name of the dataset we are supposed to access.
std::vector< T > get_chunk(std::size_t offset, std::size_t length) const
Extract a data slice from the linked dataset.
HDF5MasterBufferT< int > hdf5_buffers_int
Buffer manager objects.
const int h5v2_bufdata_values(15)
void add_float_block(const HDF5bufferchunk &chunk, const std::size_t buf)
#define LOCAL_INFO
Definition: local_info.hpp:34
GAMBIT file locking functions Use these to block access to sensitive parts of the code by other proce...
bool get_exists_on_disk() const
Variable tracking whether the dataset is known to exist in the output file yet.
virtual void create_dataset(hid_t location_id)=0
Create a new dataset at the specified location (implemented in derived class since need to know the t...
std::map< std::string, HDF5BufferBase * > all_buffers
Map containing pointers to all buffers managed by this object;.
Constructable class for doing basic operations on a HDF5 dataset.
std::pair< hid_t, hid_t > select_hyperslab(std::size_t offset, std::size_t length) const
Obtain memory and dataspace identifiers for writing to a hyperslab in the dataset.
HDF5DataSet< T > my_dataset
Object that provides an interface to the output HDF5 dataset matching this buffer.
HDF5DataSetBase(const std::string &name, const hid_t hdftype_id)
HDF5DataSetBase member functions.
Logging access header for GAMBIT.
HDF5MasterBuffer buffermaster
Object interfacing to HDF5 file and all datasets.
BOOST_PP_SEQ_FOR_EACH_I(GETTYPEID, _, PRINTABLE_TYPES) void printAllTypeIDs(void)
For debugging; print to stdout all the typeIDs for all types.
Definition: baseprinter.cpp:36
Definitions of new MPI datatypes needed by printers.
unsigned long long int ulonglong
const int h5v2_bufdata_type(14)
Declarations for the YAML options class.
std::pair< std::vector< long >, std::vector< int > > flush_to_vector_int(const std::vector< PPIDpair > &order)
HDF5Buffer(const std::string &name, const bool sync, const std::vector< PPIDpair > &buffered_points)
Constructor.
HDF5MasterBufferT< ulong > hdf5_buffers_ulong
const int h5v2_bufdata_points(11)
HDF5MasterBufferT< uint > hdf5_buffers_uint
std::set< PPIDpair > buffer_set
Set detailing what points are in the buffer.
hid_t get_hdftype_id() const
Retrieve the HDF5 type ID for this dataset.
std::size_t get_dset_length() const
Retrieve the current size of the dataset on disk.
void random_flush(const hid_t loc_id, const std::map< PPIDpair, std::size_t > &position_map)
Empty the buffer to disk as "random access" data at pre-existing positions matching the point IDs May...
bool dataset_exists(const hid_t loc_id)
Check if our dataset exists on disk with the required name at the given location. ...
Class for interfacing to a HDF5 dataset of fixed type.
std::set< PPIDpair > buffered_points_set
We also need a set of the buffered points, so we can do fast lookup of whether a point is in the buff...
void ensure_dataset_exists(const hid_t loc_id, const std::size_t length)
Make sure datasets exist on disk with the correct name and size.
const int h5v2_bufdata_ranks(12)
hid_t hdftype_id
HDF5 type ID for this dataset.
HDF5DataSet(const std::string &name)
Constructor.
HDF5Buffer< T > & get_buffer(const std::string &label, const std::vector< PPIDpair > &buffered_points)
Retrieve buffer of our type for a given label.
void update(const PPIDpair &ppid)
Make sure buffer includes the specified point (data will be set as &#39;invalid&#39; unless given elsewhere) ...
std::vector< HDF5MasterBuffer * > aux_buffers
Vector of pointers to master buffer objects for auxilliary printers Only the primary printer will hav...
void append(T const &value, const PPIDpair &ppid)
Insert data to print buffer at the specified point (overwrite if it already exists in the buffer) ...
std::pair< std::vector< double >, std::vector< int > > flush_to_vector_dbl(const std::vector< PPIDpair > &order)
const Logging::endofmessage EOM
Explicit const instance of the end of message struct in Gambit namespace.
Definition: logger.hpp:100
const int h5v2_BLOCK(30)
HDF5MasterBufferT< long > hdf5_buffers_long
bool synchronised
Flag to specify what sort of buffer this manager is supposed to be managing.
std::map< PPIDpair, int > buffer_valid
Buffer specifying whether the data in the primary buffer is "valid".
Utils::FileLock hdf5out
File locking object for the output hdf5 file.
EXPORT_SYMBOLS Logging::LogMaster & logger()
Function to retrieve a reference to the Gambit global log object.
Definition: logger.cpp:95
const int h5v2_bufdata_valid(13)
const int h5v2_bufname(10)
MPI tags for HDF5 printer v2.
Declaration and definition of printer base class.
START_MODEL dNur_CMB r
void block_flush(const hid_t loc_id, const std::vector< PPIDpair > &order, const std::size_t target_pos)
Empty the buffer to disk as block with the specified order into the target position (only allowed if ...
bool is_open
Flag to let us known if the dataset is open.
HDF5MasterBufferT< float > hdf5_buffers_float
std::vector< PPIDpair > buffered_points
Vector of PPIDpairs that are currently stored in the printer buffers This also defines the order in w...
int get_type_id() const
Retrieve the integer type ID for the buffered dataset.
bool synchronised
Flag to tell us whether this buffer should perform block writes to the output dataset, or look up and overwrite existing points.
const int h5v2_BEGIN(31)
#define LOAD_PRINTER(tag,...)
Definition: baseprinter.hpp:57
void create_dataset(hid_t location_id)
Create a new dataset at the specified location.
int type_id
Integer identifier for the template type of this dataset (determined by derived type) ...
long long values_int[NBUFFERS][SIZE]
unsigned long int ulong
std::map< std::string, HDF5Buffer< T > > my_buffers
A simple C++ wrapper for the MPI C bindings.
Base class for buffers.
bool exists_on_disk
Variable tracking whether the dataset is known to exist in the output file yet.
std::string _dset_name
Name of dataset for which this object is the buffer.
EXPORT_SYMBOLS warning & printer_warning()
Printer warnings.
void write_RA_buffer(const T(&buffer)[MAX_BUFFER_SIZE], const hsize_t(&coords)[MAX_BUFFER_SIZE], std::size_t npoints)
Write a buffer of data to disk at the specified positions (must be within current dataset extents) ...
std::string file
Output file variales.
std::string _myname
Name of the dataset in the hdf5 file.
Class to manage a set of buffers for a single output type.
bool exists_on_disk() const
Report whether the dataset for which we are the buffer exists on disk yet.
hid_t dset_id
HDF5 dataset identifer.
std::set< T > set_diff(const std::set< T > &set1, const std::set< T > &set2)
std::map< PPIDpair, T > buffer
Buffer containing points to be written to disk upon "flush".
virtual ~HDF5BufferBase()
Destructor.
hid_t get_dset_id() const
Retrieve the dataset ID for the currently open dataset.
#define DECLARE_PRINT(r, data, i, elem)
Sequence of all types printable by the HDF5 printer.
void ensure_dataset_exists(const hid_t loc_id, const std::size_t length)
Ensure that a correctly named dataset exists at the target location with the specified length...
void reset(hid_t loc_id)
Clear all data on disk for this dataset Note; this just sets all values to defaults, it doesn&#39;t delete or resize the dataset.
void write_random(const hid_t loc_id, const std::map< std::size_t, T > &data)
Write data to disk at specified positions.
Base class for interfacing to a HDF5 dataset.
void write_buffer(const T(&buffer)[MAX_BUFFER_SIZE], const std::size_t length, const std::size_t target_pos, const bool force=false)
Write a block of data to disk at the end of the dataset This is the lower-level function.
A collection of tools for interacting with HDF5 databases.
#define HDF5_TYPES
Definition: hdf5types.hpp:25
#define HDF5_BACKEND_TYPES
Definition: hdf5types.hpp:46
Base template is left undefined in order to raise a compile error if specialisation doesn&#39;t exist...
Definition: hdf5tools.hpp:77
pointID / process number pair Used to identify a single parameter space point
TODO: see if we can use this one:
Definition: Analysis.hpp:33
A small wrapper object for &#39;options&#39; nodes.
Class to manage all buffers for a given printer object Also handles the file locking/access to the ou...
HDF5MasterBufferT< double > hdf5_buffers_double
virtual ~HDF5DataSetBase()
Destructor.
void extend_dset_to(const std::size_t new_size)
Extend dataset to the specified size, filling it with default values.
long long int longlong
Concatenation macros.
int get_type_id() const
Retrieve the integer type ID for this dataset.
std::size_t buffer_length
Max allowed size of buffer.
void extend_dset_by(const std::size_t extend_by)
Set the variable that tracks the (virtual) dataset size on disk.