gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-252-gf9a3f78
a Global And Modular Bsm Inference Tool
hdf5printer_v2.hpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
30 
31 
32 #ifndef __hdf5printer_v2_hpp__
33 #define __hdf5printer_v2_hpp__
34 
35 #include <algorithm>
36 #include <set>
37 #include <iterator>
38 #include <string>
39 
40 // BOOST_PP
41 #include <boost/preprocessor/seq/for_each_i.hpp>
42 
43 // GAMBIT
47 #include "gambit/Utils/cats.hpp"
52 #include "gambit/Logs/logger.hpp"
53 
54 // Activate extra debug logging (warning, LOTS of output)
55 //#define HDF5PRINTER2_DEBUG
56 
57 namespace Gambit
58 {
59  namespace Printers
60  {
61 
62  typedef unsigned int uint;
63  typedef unsigned long ulong;
64  typedef long long longlong;
65  typedef unsigned long long ulonglong;
66 
67  // DEBUG h5v2_BLOCK message counters
68  //static int recv_counter;
69  //static int send_counter;
70 
73  static const std::size_t HDF5_CHUNKLENGTH = 100;
74 
76  static const std::size_t DSETRANK = 1;
77 
79  static const std::size_t MAX_BUFFER_SIZE = 100000;
80 
82  const int h5v2_bufname(10);
83  const int h5v2_bufdata_points(11);
84  const int h5v2_bufdata_ranks(12);
85  const int h5v2_bufdata_valid(13);
86  const int h5v2_bufdata_type(14);
87  const int h5v2_bufdata_values(15);
88  // Message block end tag:
89  // 1 means "there is another block of messages to receive"
90  // 0 means "there are no more blocks of messages to receive"
91  const int h5v2_BLOCK(30);
92  // "Begin sending data" tag
93  const int h5v2_BEGIN(31);
94 
95  // The 'h5v2_bufdata_type' messages send an integer encoding
96  // the datatype for the h5v2_bufdata_values messages
97  // Need a unique integer for each type. We can encode these
98  // with a template function:
99 
100  template<class T>
101  std::set<T> set_diff(const std::set<T>& set1, const std::set<T>& set2)
102  {
103  std::set<T> result;
104  std::set_difference(set1.begin(), set1.end(), set2.begin(), set2.end(),
105  std::inserter(result, result.end()));
106  return result;
107  }
108 
111  {
112  public:
113  HDF5DataSetBase(const std::string& name, const hid_t hdftype_id);
114  HDF5DataSetBase(const std::string& name);
115  virtual ~HDF5DataSetBase();
116 
118  void open_dataset(hid_t location_id);
119 
121  void close_dataset();
122 
125  virtual void create_dataset(hid_t location_id) = 0;
126 
128  std::size_t get_dset_length() const;
129 
131  bool dataset_exists(const hid_t loc_id);
132 
134  void ensure_dataset_exists(const hid_t loc_id, const std::size_t length);
135 
137  void extend_dset_to(const std::size_t new_size);
138 
140  std::string myname() const;
141 
143  int get_type_id() const;
144 
146  hid_t get_hdftype_id() const;
147 
149  bool get_exists_on_disk() const;
150  void set_exists_on_disk();
151 
152  private:
153 
154  // Dataset and chunk dimension specification arrays
155  // We are only using 1D output datasets for simplicity.
156  // Values are only valid if 'is_open==true'
157  hsize_t dims [DSETRANK];
158  hsize_t maxdims [DSETRANK];
159  hsize_t chunkdims[DSETRANK];
160  //hsize_t slicedims[DSETRANK];
161  // Note, dims[0] is current size of dataset, so next unused index is equal to dims[0]
162 
164  std::string _myname;
165 
167  bool is_open;
168 
170  std::size_t virtual_dset_length;
171 
174 
175  protected:
176 
179 
181  void ensure_dataset_is_open() const;
182 
184  hid_t get_dset_id() const;
185 
187  //void set_dset_length(const std::size_t newsize);
188 
190  void extend_dset_by(const std::size_t extend_by);
191 
193  std::pair<hid_t,hid_t> select_hyperslab(std::size_t offset, std::size_t length) const;
194 
197 
199  int type_id;
200  };
201 
204  {
205  public:
206  HDF5DataSetBasic(const std::string& name);
207  void create_dataset(hid_t location_id);
208  };
209 
211  template<class T>
213  {
214  public:
215 
217  HDF5DataSet(const std::string& name)
218  : HDF5DataSetBase(name,get_hdf5_data_type<T>::type())
219  {}
220 
222  std::size_t write_vector(const hid_t loc_id, const std::vector<T>& data, const std::size_t target_pos, const bool force=false)
223  {
224  open_dataset(loc_id);
225  bool all_data_written=false;
226  T buffer[MAX_BUFFER_SIZE];
227  std::size_t i = 0;
228  std::size_t offset = target_pos;
229  //std::cout<<"Preparing to write "<<data.size()<<" elements to dataset "<<myname()<<" at position "<<target_pos<<std::endl;
230  while(not all_data_written)
231  {
232  std::size_t j;
233  // Copy data into buffer up to MAX_BUFFER_SIZE
234  for(j=0;
235  (j<MAX_BUFFER_SIZE) && (i<data.size());
236  ++j, ++i)
237  {
238  // DEBUG inspect buffer
239  //std::cout<< " buffer["<<j<<"] = data.at("<<i<<") = "<<data.at(i)<<std::endl;
240  buffer[j] = data.at(i);
241  }
242  //std::cout<<" i="<<i<<", j="<<j<<", data.size()="<<data.size()<<", MAX_BUFFER_SIZE="<<MAX_BUFFER_SIZE<<std::endl;
243  // Write buffer to disk
244  //std::cout<<"Writing "<<j<<" elements to dataset "<<myname()<<" at position "<<offset<<std::endl;
245  write_buffer(buffer,j,offset,force);
246  offset += j;
247  if(i==data.size()) all_data_written = true;
248  }
249  std::size_t new_dset_size = get_dset_length();
250  close_dataset();
251 
252  // Report new size of the dataset so that we can check that all datasets are the same length
253  return new_dset_size;
254  }
255 
262  void write_buffer(const T (&buffer)[MAX_BUFFER_SIZE], const std::size_t length, const std::size_t target_pos, const bool force=false)
263  {
264  if(length>MAX_BUFFER_SIZE)
265  {
266  std::ostringstream errmsg;
267  errmsg << "Error! Received buffer with length ("<<length<<") greater than MAX_BUFFER_SIZE ("<<MAX_BUFFER_SIZE<<") while tring to perform block write for dataset (name="<<myname()<<"). The input to this function is therefore invalid.";
268  printer_error().raise(LOCAL_INFO, errmsg.str());
269  }
270  if(length==0)
271  {
272  std::ostringstream errmsg;
273  errmsg << "Error! Received buffer of length zero! This will cause an error when trying to select element for writing, and there is no point calling this function with no points to write anyway. Please review the input to this function (error occurred while tring to perform block write for dataset (name="<<myname()<<"))";
274  printer_error().raise(LOCAL_INFO, errmsg.str());
275  }
276 
277  // DEBUG dump whole buffer up to length to check it
278  //for(std::size_t i=0;i<length;++i)
279  //{
280  // std::cout<<" buffer["<<i<<"] = "<<buffer[i]<<std::endl;
281  //}
282 
284 
285  // Get the C interface identifier for the type of the output dataset
286  hid_t expected_dtype = get_hdftype_id();
287  hid_t dtype = H5Dget_type(get_dset_id()); // type with which the dset was created
288  if(not H5Tequal(dtype, expected_dtype))
289  {
290  std::ostringstream errmsg;
291  errmsg << "Error! Tried to write to dataset (name="<<myname()<<") with type id "<<dtype<<" but expected it to have type id "<<expected_dtype<<". This is a bug, please report it.";
292  printer_error().raise(LOCAL_INFO, errmsg.str());
293  }
294 
295  std::size_t required_size = target_pos+length;
296  // Check that target position is allowed
297  if(target_pos < get_dset_length())
298  {
299  if(force)
300  {
301  if(required_size > get_dset_length())
302  {
303  // Some overlap into unused space, partial dataset extension required.
304  extend_dset_to(required_size);
305  }
306  // Else whole target block is inside current dataset size. No extension required.
307  }
308  else
309  {
310  std::ostringstream errmsg;
311  errmsg << "Error! Tried to write block to dataset (name="<<myname()<<"), but target index ("<<target_pos<<") is inside the current dataset extents (dset size="<<get_dset_length()<<"), i.e. some of the target slots are already used! This is a bug, please report it.";
312  printer_error().raise(LOCAL_INFO, errmsg.str());
313  }
314  }
315  else
316  {
317  // Target block fully outside current dataset extents. Extend to fit.
318  extend_dset_to(required_size);
319  }
320 
321  // Select output hyperslab
322  // (this also determines what data will be read out of the buffer)
323  std::pair<hid_t,hid_t> selection_ids = select_hyperslab(target_pos,length);
324  hid_t memspace_id = selection_ids.first;
325  hid_t dspace_id = selection_ids.second;
326 
327  // Write the data to the hyperslab.
328  herr_t status = H5Dwrite(get_dset_id(), get_hdftype_id(), memspace_id, dspace_id, H5P_DEFAULT, buffer);
329  if(status<0)
330  {
331  std::ostringstream errmsg;
332  errmsg << "Error writing new chunk to dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Dwrite failed." << std::endl;
333  printer_error().raise(LOCAL_INFO, errmsg.str());
334  }
335 
336  // Release the hyperslab IDs
337  H5Sclose(dspace_id);
338  H5Sclose(memspace_id);
339 
340  //std::cout<<"write_buffer finished; new dataset size is: "<<get_dset_length()<<std::endl;
341  }
342 
344  void write_random(const hid_t loc_id, const std::map<std::size_t,T>& data)
345  {
346  open_dataset(loc_id);
347  bool all_data_written=false;
348  T buffer[MAX_BUFFER_SIZE];
349  hsize_t coords[MAX_BUFFER_SIZE];
350  auto it = data.begin();
351  while(not all_data_written)
352  {
353  // Copy data into buffer up to MAX_BUFFER_SIZE
354  std::size_t j;
355  for(j=0;
356  (j<MAX_BUFFER_SIZE) && (it!=data.end());
357  ++j, ++it)
358  {
359  buffer[j] = it->second;
360  coords[j] = it->first;
361  }
362  // Write buffer to disk
363  if(j>0) write_RA_buffer(buffer,coords,j);
364  if(it==data.end()) all_data_written = true;
365  }
366  close_dataset();
367  }
368 
370  void write_RA_buffer(const T (&buffer)[MAX_BUFFER_SIZE], const hsize_t (&coords)[MAX_BUFFER_SIZE], std::size_t npoints)
371  {
372  if(npoints>MAX_BUFFER_SIZE)
373  {
374  std::ostringstream errmsg;
375  errmsg << "Error! Received npoints ("<<npoints<<") greater than MAX_BUFFER_SIZE ("<<MAX_BUFFER_SIZE<<") while tring to perform RA write for dataset (name="<<myname()<<"). The input to this function is therefore invalid.";
376  printer_error().raise(LOCAL_INFO, errmsg.str());
377  }
378  if(npoints==0)
379  {
380  std::ostringstream errmsg;
381  errmsg << "Error! Received npoints=0! This will cause an error when trying to select element for writing, and there is no point calling this function with no points to write anyway. Please review the input to this function (error occurred while tring to perform RA write for dataset (name="<<myname()<<"))";
382  printer_error().raise(LOCAL_INFO, errmsg.str());
383  }
384 
386 
387  bool error_occurred = false; // simple error flag
388 
389  // DEBUG: check coords array
390  //for(std::size_t i=0; i<npoints; i++) std::cout<<"coords["<<i<<"] = "<<coords[i]<<std::endl;
391 
392  // Check that no data is to be written outside the current dataset extents. This
393  // function is only for writing back to points that already exist!
394  std::size_t max_coord = *std::max_element(coords,coords+npoints);
395  if(max_coord > get_dset_length())
396  {
397  std::ostringstream errmsg;
398  errmsg<<"Attempted to perform RA write to a point outside the current dataset extents (max_coord="<<max_coord<<", dset_length="<<get_dset_length()<<")! The dataset should be resized prior to calling this function, so this is a bug, please report it.";
399  printer_error().raise(LOCAL_INFO, errmsg.str());
400  }
401 
402  // Dataset size in memory
403  static const std::size_t MDIM_RANK = 1;
404  hsize_t mdim[] = {npoints};
405 
406  // Dataspace for the output values
407  hid_t dspace = H5Screate_simple(MDIM_RANK, mdim, NULL);
408  if(dspace<0) error_occurred = true;
409 
410  // Get the C interface identifier for a copy of the dataspace
411  // of the dataset
412  hid_t dspace_id = H5Dget_space(get_dset_id());
413  if(dspace_id<0) error_occurred = true;
414 
415  // Select the target write points in the file dataspace
416  hid_t errflag = H5Sselect_elements(dspace_id, H5S_SELECT_SET, npoints, coords);
417  if(errflag<0) error_occurred = true;
418 
419  // Get the C interface identifier for the type of the output dataset
420  hid_t expected_dtype = get_hdftype_id();
421  hid_t dtype = H5Dget_type(get_dset_id()); // type with which the dset was created
422  if(not H5Tequal(dtype, expected_dtype))
423  {
424  std::ostringstream errmsg;
425  errmsg << "Error! Tried to write to dataset (name="<<myname()<<") with type id "<<dtype<<" but expected it to have type id "<<expected_dtype<<". This is a bug, please report it.";
426  printer_error().raise(LOCAL_INFO, errmsg.str());
427  }
428 
429  // Write data to selected points
430  // (H5P_DEFAULT specifies some transfer properties for the I/O
431  // operation. These are the default values, probably are ok.)
432  hid_t errflag2 = H5Dwrite(get_dset_id(), dtype, dspace, dspace_id, H5P_DEFAULT, buffer);
433 
434  if(errflag2<0) error_occurred = true;
435 
436  if(error_occurred)
437  {
438  std::ostringstream errmsg;
439  errmsg << "Error! Failed to write desynchronised buffer data to file! (dataset name="<<myname()<<")"<<std::endl
440  << "Error flags were:" << std::endl
441  << " dspace : " << dspace << std::endl
442  << " dspace_id: " << dspace_id << std::endl
443  << " errflag : " << errflag << std::endl
444  << " errflag2 : " << errflag2 << std::endl
445  << "Variables:" << std::endl
446  << " dtype = " << dtype;
447  printer_error().raise(LOCAL_INFO, errmsg.str());
448  }
449 
450  H5Tclose(dtype);
451  H5Sclose(dspace_id);
452  H5Sclose(dspace);
453  }
454 
456  std::vector<T> get_chunk(std::size_t offset, std::size_t length) const
457  {
458  // Buffer to receive data (and return from function)
459  std::vector<T> chunkdata(length);
460 
461  // Select hyperslab
462  std::pair<hid_t,hid_t> selection_ids = select_hyperslab(offset,length);
463  hid_t memspace_id = selection_ids.first;
464  hid_t dspace_id = selection_ids.second;
465 
466  // Buffer to receive data
467  void* buffer = &chunkdata[0]; // pointer to contiguous memory within the buffer vector
468 
469  // Get the data from the hyperslab.
470  herr_t err_read = H5Dread(get_dset_id(), get_hdftype_id(), memspace_id, dspace_id, H5P_DEFAULT, buffer);
471 
472  if(err_read<0)
473  {
474  std::ostringstream errmsg;
475  errmsg << "Error retrieving chunk (offset="<<offset<<", length="<<length<<") from dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Dread failed." << std::endl;
476  errmsg << " offset+length = "<< offset+length << std::endl;
477  errmsg << " dset_length() = "<< get_dset_length() << std::endl;
478  printer_error().raise(LOCAL_INFO, errmsg.str());
479  }
480 
481  H5Sclose(dspace_id);
482  H5Sclose(memspace_id);
483 
484  return chunkdata;
485  }
486 
490  void reset(hid_t loc_id)
491  {
492  if(dataset_exists(loc_id))
493  {
494  open_dataset(loc_id);
495  std::size_t remaining_length = get_dset_length();
496  close_dataset();
497  std::size_t target_pos = 0;
498  while(remaining_length>0)
499  {
500  std::vector<T> zero_buffer;
501  if(remaining_length>=MAX_BUFFER_SIZE)
502  {
503  zero_buffer = std::vector<T>(MAX_BUFFER_SIZE);
504  remaining_length -= MAX_BUFFER_SIZE;
505  }
506  else
507  {
508  zero_buffer = std::vector<T>(remaining_length);
509  remaining_length = 0;
510  }
511  write_vector(loc_id, zero_buffer, target_pos, true);
512  target_pos += MAX_BUFFER_SIZE;
513  }
514  }
515  // else the dataset doesn't even exist yet (no buffer flushes have occurred yet),
516  // so don't need to reset anything.
517  }
518 
520  void create_dataset(hid_t location_id);
521 
522  };
523 
525  template<class T>
527  {
528  hsize_t dims [DSETRANK];
529  hsize_t maxdims [DSETRANK];
530  hsize_t chunkdims[DSETRANK];
531  //hsize_t slicedims[DSETRANK];
532 
533  // Compute initial dataspace and chunk dimensions
534  dims[0] = 0; // Empty to start
535  maxdims[0] = H5S_UNLIMITED; // No upper limit on number of records allowed in dataset
536  chunkdims[0] = HDF5_CHUNKLENGTH;
537  //slicedims[0] = 1; // Dimensions of a single record in the data space
538 
539  // Create the data space
540  hid_t dspace_id = H5Screate_simple(DSETRANK, dims, maxdims);
541  if(dspace_id<0)
542  {
543  std::ostringstream errmsg;
544  errmsg << "Error creating dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Screate_simple failed.";
545  printer_error().raise(LOCAL_INFO, errmsg.str());
546  }
547 
548  // Object containing dataset creation parameters
549  hid_t cparms_id = H5Pcreate(H5P_DATASET_CREATE);
550  if(cparms_id<0)
551  {
552  std::ostringstream errmsg;
553  errmsg << "Error creating dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Pcreate failed.";
554  printer_error().raise(LOCAL_INFO, errmsg.str());
555  }
556 
557  herr_t status = H5Pset_chunk(cparms_id, DSETRANK, chunkdims);
558  if(status<0)
559  {
560  std::ostringstream errmsg;
561  errmsg << "Error creating dataset (with name=\""<<myname()<<"\") in HDF5 file. H5Pset_chunk failed.";
562  printer_error().raise(LOCAL_INFO, errmsg.str());
563  }
564 
565  // Check if location id is invalid
566  if(location_id==-1)
567  {
568  std::ostringstream errmsg;
569  errmsg << "Error! Tried to create hdf5 dataset (with name=\""<<myname()<<"\") at undefined location (location_id was -1). Please check that calling code supplied a valid location handle. This is a bug, please report it.";
570  printer_error().raise(LOCAL_INFO, errmsg.str());
571  }
572 
573  // Create the dataset
574  hid_t dset_id = H5Dcreate2(location_id, myname().c_str(), get_hdftype_id(), dspace_id, H5P_DEFAULT, cparms_id, H5P_DEFAULT);
575  if(dset_id<0)
576  {
577  std::ostringstream errmsg;
578  errmsg << "Error creating dataset (with name=\""<<myname()<<"\") in HDF5 file. Dataset with same name may already exist";
579  printer_error().raise(LOCAL_INFO, errmsg.str());
580  }
581 
582  // Release the dataspace IDs
583  H5Sclose(dspace_id);
584  H5Pclose(cparms_id);
585  H5Dclose(dset_id);
586 
587  // Register that the dataset now exists on disk
589  }
590 
591 
594  {
595  public:
596 
598  HDF5BufferBase(const std::string& name, const bool sync);
599 
601  virtual ~HDF5BufferBase() {}
602 
604  std::string dset_name() const;
605 
607  virtual bool exists_on_disk() const = 0;
608 
610  virtual void update(const PPIDpair& ppid) = 0;
611 
613  virtual void block_flush(const hid_t loc_id, const std::vector<PPIDpair>& order, const std::size_t target_pos) = 0;
614 
616  virtual void random_flush(const hid_t loc_id, const std::map<PPIDpair,std::size_t>& position_map) = 0;
617 
618  // Retrieve buffer data in specified order (leaving it empty!) along with type ID in
619  // As a double.
620  virtual std::pair<std::vector<double>,std::vector<int>> flush_to_vector_dbl(const std::vector<PPIDpair>& order) = 0;
621  // int version
622  virtual std::pair<std::vector<long>,std::vector<int>> flush_to_vector_int(const std::vector<PPIDpair>& order) = 0;
623 
624 #ifdef WITH_MPI
625  virtual void MPI_flush_to_rank(const unsigned int r) = 0;
627 
628 #endif
629 
631  virtual void ensure_dataset_exists(const hid_t loc_id, const std::size_t length) = 0;
632 
634  virtual void reset(hid_t loc_id) = 0;
635 
636  // Report whether this buffer is synchronised
637  bool is_synchronised() const;
638 
639  // Report the number of items currently in the buffer;
640  virtual std::size_t N_items_in_buffer() = 0;
641 
643  std::set<PPIDpair> get_points_set() const;
644 
646  virtual int get_type_id() const = 0;
647 
648  private:
649 
651  std::string _dset_name;
652 
656 
657  protected:
658 
660  std::set<PPIDpair> buffer_set;
661 
663  std::map<PPIDpair,int> buffer_valid;
664  };
665 
667  template<class T>
669  {
670  public:
671 
673  HDF5Buffer(const std::string& name, const bool sync, const std::vector<PPIDpair>& buffered_points
674 #ifdef WITH_MPI
675  // Gambit MPI communicator context for use within the hdf5 printer system
676  , GMPI::Comm& comm
677 #endif
678  )
679  : HDF5BufferBase(name,sync)
680  , my_dataset(name)
681  , my_dataset_valid(name+"_isvalid")
682 #ifdef WITH_MPI
683  , myComm(comm)
684 #endif
685  {
686  // Add points known to other buffers (as 'invalid' data, for synchronisation purposes)
687  for(auto it=buffered_points.begin(); it!=buffered_points.end(); ++it)
688  {
689  update(*it);
690  }
691  }
692 
694  void update(const PPIDpair& ppid)
695  {
696  buffer[ppid]; // Create point with default value if it doesn't exist
697  buffer_set.insert(ppid);
698  auto it = buffer_valid.find(ppid);
699  // If point not already in the buffer, set it as invalid
700  if(it==buffer_valid.end())
701  {
702  buffer_valid[ppid] = 0;
703  // DEBUG
704  //std::cout<<"Set point "<<ppid<<" to 'invalid' for buffer "<<dset_name()<<std::endl;
705  }
706  }
707 
709  void append(T const& value, const PPIDpair& ppid)
710  {
711  buffer [ppid] = value;
712  buffer_valid[ppid] = 1;
713  buffer_set.insert(ppid);
714  //logger()<<" ***Added valid data "<<value<<" to point "<<ppid<<" in buffer "<<dset_name()<<std::endl;
715  }
716 
719  void block_flush(const hid_t loc_id, const std::vector<PPIDpair>& order, const std::size_t target_pos)
720  {
721  // Make sure output order is same size as the buffer to be output
722  if(order.size() != buffer.size())
723  {
724  std::ostringstream errmsg;
725  errmsg << "Supplied buffer ordering vector is not the same size as the buffer (buffer.size()="<<buffer.size()<<", order.size()="<<order.size()<<"; dset_name()="<<dset_name()<<"). This is a bug, please report it." <<std::endl;
726  errmsg << "Extra debug information:" << std::endl;
727  errmsg << " buffer.size() = "<<buffer.size()<<std::endl;
728  errmsg << " buffer_valid.size() = "<<buffer_valid.size()<<std::endl;
729  errmsg << " buffer_set.size() = "<<buffer_set.size()<<std::endl;
730  printer_error().raise(LOCAL_INFO, errmsg.str());
731  }
732 
733  // Need to keep track of whether buffer points have been added to the ordered output
734  std::set<PPIDpair> done;
735 
736  // Create a vector version of the buffer in the specified order
737  std::vector<T> ordered_buffer;
738  std::vector<int> ordered_buffer_valid;
739  for(auto ppid_it=order.begin(); ppid_it!=order.end(); ++ppid_it)
740  {
741  if(done.count(*ppid_it)!=0)
742  {
743  std::ostringstream errmsg;
744  errmsg << "Supplied buffer ordering vector contains a duplicate PPIDpair! This is a bug, please report it.";
745  printer_error().raise(LOCAL_INFO, errmsg.str());
746  }
747  ordered_buffer .push_back(buffer .at(*ppid_it));
748  ordered_buffer_valid.push_back(buffer_valid.at(*ppid_it));
749  done.insert(*ppid_it);
750  }
751 
752  // Check if any points were not added to the ordered buffer
753  std::set<PPIDpair> not_done = set_diff(buffer_set,done);
754 
755  if(not_done.size()>0)
756  {
757  std::ostringstream errmsg;
758  errmsg << "Supplied buffer ordering vector does not specify order positions for all points in the buffer! This is a bug, please report it.";
759  printer_error().raise(LOCAL_INFO, errmsg.str());
760  }
761 
762  if(ordered_buffer.size() != buffer.size())
763  {
764  std::ostringstream errmsg;
765  errmsg << "The ordered buffer we just constructed is not the same size as the original buffer! This is a bug, please report it.";
766  printer_error().raise(LOCAL_INFO, errmsg.str());
767  }
768 
769  // Perform dataset writes
770  #ifdef HDF5PRINTER2_DEBUG
772  logger()<<"Writing block of data to disk for dataset "<<dset_name()<<std::endl;
773  logger()<<" Data to write (to target_pos="<<target_pos<<"):"<<std::endl;
774  for(auto it=ordered_buffer.begin(); it!=ordered_buffer.end(); ++it)
775  {
776  logger()<<" "<<*it<<std::endl;
777  }
778  logger()<<EOM;
779  #endif
780 
781  std::size_t newsize = my_dataset .write_vector(loc_id,ordered_buffer ,target_pos);
782  std::size_t newsize_v = my_dataset_valid.write_vector(loc_id,ordered_buffer_valid,target_pos);
783  if(newsize!=newsize_v)
784  {
785  std::ostringstream errmsg;
786  errmsg<<"Inconsistent dataset sizes detected after buffer flush! (newsize="<<newsize<<", newsize_v="<<newsize_v<<")";
787  printer_error().raise(LOCAL_INFO, errmsg.str());
788  }
789 
790  // Clear buffer variables
791  buffer .clear();
792  buffer_valid.clear();
793  buffer_set.clear();
794  }
795 
799  void random_flush(const hid_t loc_id, const std::map<PPIDpair,std::size_t>& position_map)
800  {
801  std::map<std::size_t,T> pos_buffer;
802  std::map<std::size_t,int> pos_buffer_valid;
803 
804  // DEBUG inspect buffer
805  //for(auto it=buffer.begin(); it!=buffer.end(); ++it)
806  //{
807  // std::cout<<"buffer["<<it->first<<"] = "<<it->second<<std::endl;
808  //}
809 
810  for(auto it=position_map.begin(); it!=position_map.end(); ++it)
811  {
812  const PPIDpair& ppid = it->first;
813  const std::size_t& position = it->second;
814  auto bit = buffer .find(ppid);
815  auto vit = buffer_valid.find(ppid);
816  if(bit==buffer.end() or vit==buffer_valid.end())
817  {
818  std::ostringstream errmsg;
819  errmsg<<"Could not find point "<<ppid<<" in buffer! This is a bug, please report it."<<std::endl;
820  printer_error().raise(LOCAL_INFO, errmsg.str());
821  }
822  if(vit->second) // I think there is no reason to write the RA data to disk if it is invalid. Buffers should have been reset if need to clear points.
823  {
824  pos_buffer [position] = bit->second;
825  pos_buffer_valid[position] = vit->second;
826  }
827  // Erase point from buffer
828  buffer .erase(bit);
829  buffer_valid.erase(vit);
830  buffer_set.erase(ppid);
831  }
832  // Perform dataset writes
833  my_dataset .write_random(loc_id, pos_buffer );
834  my_dataset_valid.write_random(loc_id, pos_buffer_valid);
835  }
836 
839  void reset(hid_t loc_id)
840  {
841  if(not is_synchronised())
842  {
843  // Only need to clear the "validity" dataset
844  // Doesn't matter what values are in the main datasets
845  // once they are marked as 'invalid'.
846  buffer .clear();
847  buffer_valid.clear();
848  buffer_set.clear();
849  //my_dataset .reset(loc_id);
850  my_dataset_valid.reset(loc_id);
851  }
852  else
853  {
854  std::ostringstream errmsg;
855  errmsg<<"Reset called on buffer for data label "<<dset_name()<<", however this output stream is marked as 'synchronised'. It therefore cannot be reset! This is a bug, please report it.";
856  printer_error().raise(LOCAL_INFO, errmsg.str());
857  }
858  }
859 
861  void ensure_dataset_exists(const hid_t loc_id, const std::size_t length)
862  {
863  my_dataset .ensure_dataset_exists(loc_id,length);
864  my_dataset_valid.ensure_dataset_exists(loc_id,length);
865  }
866 
868  bool exists_on_disk() const
869  {
870  return my_dataset.get_exists_on_disk();
871  // TODO: Should make sure that 'valid' dataset also exists on disk
872  }
873 
874  // Report the number of items currently in the buffer;
875  std::size_t N_items_in_buffer()
876  {
878  if( buffer.size()!=buffer_set.size()
879  or buffer.size()!=buffer_valid.size())
880  {
881  std::ostringstream errmsg;
882  errmsg<<"Internal inconsistency detected in buffer for dataset "<<dset_name()<<"; the following variables should all be the same size, but are not:"<<std::endl;
883  errmsg<<" buffer .size() = "<<buffer .size()<<std::endl;
884  errmsg<<" buffer_valid.size() = "<<buffer_valid.size()<<std::endl;
885  errmsg<<" buffer_set .size() = "<<buffer_set .size()<<std::endl;
886  printer_error().raise(LOCAL_INFO, errmsg.str());
887  }
888  return buffer.size();
889  }
890 
891 #ifdef WITH_MPI
892  // Send buffer contents to a different process
893  void MPI_flush_to_rank(const unsigned int r)
894  {
895  if(buffer.size()>0)
896  {
897  // Get name of the dataset this buffer is associated with
898  std::string namebuf = dset_name();
899  // Copy point data and values into MPI send buffer
900  std::vector<unsigned long> pointIDs;
901  std::vector<unsigned int> ranks; // Will assume all PPIDpairs are valid. I think this is fine to do...
902  std::vector<int> valid; // We have to send the invalid points too, to maintain buffer synchronicity
903  std::vector<T> values;
904  int type(h5v2_type<T>()); // Get integer identifying the type of the data values
905  int more_buffers = 1; // Flag indicating that there is a block of data to receive
906  for(auto it=buffer.begin(); it!=buffer.end(); ++it)
907  {
908  pointIDs.push_back(it->first.pointID);
909  ranks .push_back(it->first.rank);
910  valid .push_back(buffer_valid.at(it->first));
911  values .push_back(it->second);
912  }
913 
914  // Debug info
915  #ifdef HDF5PRINTER2_DEBUG
916  logger()<<LogTags::printers<<LogTags::debug<<"Sending points for buffer "<<dset_name()<<std::endl
917  <<" (more_buffers: "<<more_buffers<<")"<<std::endl;
918  for(std::size_t i=0; i<buffer.size(); ++i)
919  {
920  logger()<<" Sending point ("<<ranks.at(i)<<", "<<pointIDs.at(i)<<")="<<values.at(i)<<" (valid="<<valid.at(i)<<")"<<std::endl;
921  }
922  logger()<<EOM;
923  #endif
924 
925  // Send the buffers
926  std::size_t Npoints = values.size();
927  myComm.Send(&more_buffers, 1, r, h5v2_BLOCK);
928  //std::cerr<<myComm.Get_rank()<<": sent "<<more_buffers<<std::endl;
929  //send_counter+=1;
930  myComm.Send(&namebuf[0] , namebuf.size(), MPI_CHAR, r, h5v2_bufname);
931  myComm.Send(&type , 1 , r, h5v2_bufdata_type);
932  myComm.Send(&values[0] , Npoints, r, h5v2_bufdata_values);
933  myComm.Send(&pointIDs[0], Npoints, r, h5v2_bufdata_points);
934  myComm.Send(&ranks[0] , Npoints, r, h5v2_bufdata_ranks);
935  myComm.Send(&valid[0] , Npoints, r, h5v2_bufdata_valid);
936 
937  // Clear buffer variables
938  buffer .clear();
939  buffer_valid.clear();
940  buffer_set .clear();
941  }
942  }
943 
944  // Receive buffer contents from a different process
945  // (MasterBuffer should have received the name, type, and size of the
946  // buffer data, and used this to construct/retrieve this buffer.
947  // We then collect the buffer data messages)
948  void MPI_recv_from_rank(unsigned int r, std::size_t Npoints)
949  {
951  std::vector<unsigned long> pointIDs(Npoints);
952  std::vector<unsigned int> ranks(Npoints);
953  std::vector<int> valid(Npoints);
954  std::vector<T> values(Npoints);
955 
956  // Receive buffer data
957  myComm.Recv(&values[0] , Npoints, r, h5v2_bufdata_values);
958  myComm.Recv(&pointIDs[0], Npoints, r, h5v2_bufdata_points);
959  myComm.Recv(&ranks[0] , Npoints, r, h5v2_bufdata_ranks);
960  myComm.Recv(&valid[0] , Npoints, r, h5v2_bufdata_valid);
961 
962  // Pack it into this buffer
963  #ifdef HDF5PRINTER2_DEBUG
964  logger()<<LogTags::printers<<LogTags::debug<<"Adding points to buffer "<<dset_name()<<std::endl;
965  for(std::size_t i=0; i<Npoints; ++i)
966  {
967  // Extra Debug
968  logger()<<" Adding received point ("<<ranks.at(i)<<", "<<pointIDs.at(i)<<")="<<values.at(i)<<" (valid="<<valid.at(i)<<")"<<std::endl;
969  PPIDpair ppid(pointIDs.at(i), ranks.at(i));
970  if(valid.at(i))
971  {
972  append(values.at(i), ppid);
973  }
974  else
975  {
976  update(ppid);
977  }
978  }
979  logger()<<EOM;
980  #endif
981 
982  // Debug info:
983  //std::cout<<"(rank "<<myComm.Get_rank()<<") Final buffer size: "<<N_items_in_buffer()<<" (Npoints was: "<<Npoints<<"), dset="<<dset_name()<<std::endl;
984  }
985 #endif
986 
987  void add_float_block(const HDF5bufferchunk& chunk, const std::size_t buf)
988  {
989  // Pack it into this buffer
990  #ifdef HDF5PRINTER2_DEBUG
991  logger()<<LogTags::printers<<LogTags::debug<<"Adding 'float type' points to buffer "<<dset_name()<<std::endl;
992  #endif
993  for(std::size_t i=0; i<chunk.used_size; ++i)
994  {
995  bool valid = chunk.valid[buf][i];
996  PPIDpair ppid(chunk.pointIDs[i], chunk.ranks[i]);
997  if(valid)
998  {
999  T value = static_cast<T>(chunk.values[buf][i]);
1000  #ifdef HDF5PRINTER2_DEBUG
1001  logger()<<" Adding valid point (rank="<<chunk.ranks[i]<<", pointID="<<chunk.pointIDs[i]<<", value="<<value<<")"<<std::endl;
1002  #endif
1003  append(value, ppid);
1004  }
1005  else
1006  {
1007  #ifdef HDF5PRINTER2_DEBUG
1008  logger()<<" Updating with invalid point (rank="<<chunk.ranks[i]<<", pointID="<<chunk.pointIDs[i]<<")"<<std::endl;
1009  #endif
1010  update(ppid);
1011  }
1012  }
1013  #ifdef HDF5PRINTER2_DEBUG
1014  logger()<<EOM;
1015  #endif
1016  }
1017 
1018  void add_int_block(const HDF5bufferchunk& chunk, const std::size_t buf)
1019  {
1020  // Pack it into this buffer
1021  #ifdef HDF5PRINTER2_DEBUG
1022  logger()<<LogTags::printers<<LogTags::debug<<"Adding 'int type' points (from chunk["<<buf<<"] with name ID "<<chunk.name_id[buf]<<") to buffer "<<dset_name()<<std::endl;
1023  #endif
1024  for(std::size_t i=0; i<chunk.used_size; ++i)
1025  {
1026  bool valid = chunk.valid[buf][i];
1027  PPIDpair ppid(chunk.pointIDs[i], chunk.ranks[i]);
1028  if(valid)
1029  {
1030  T value = static_cast<T>(chunk.values_int[buf][i]);
1031  #ifdef HDF5PRINTER2_DEBUG
1032  logger()<<" Adding valid point (rank="<<chunk.ranks[i]<<", pointID="<<chunk.pointIDs[i]<<", value="<<value<<")"<<std::endl;
1033  #endif
1034  append(value, ppid);
1035  }
1036  else
1037  {
1038  #ifdef HDF5PRINTER2_DEBUG
1039  logger()<<" Updating with invalid point (rank="<<chunk.ranks[i]<<", pointID="<<chunk.pointIDs[i]<<")"<<std::endl;
1040  #endif
1041  update(ppid);
1042  }
1043  }
1044  #ifdef HDF5PRINTER2_DEBUG
1045  logger()<<EOM;
1046  #endif
1047 
1048  // // Super debug; check entire buffer contents
1049  // logger()<<LogTags::printers<<LogTags::debug;
1050  // logger()<<"Checking buffer contents for dataset "<<dset_name()<<std::endl;
1051  // for(auto it=buffer.begin(); it!=buffer.end(); ++it)
1052  // {
1053  // logger()<<" "<<it->first<<", "<<it->second<<std::endl;
1054  // }
1055  // logger()<<EOM;
1056 
1057  }
1058 
1059  // Retrieve buffer data in specified order (removing the points specified in 'order' from the buffer)
1060  // Points not in the buffer are returned as "invalid"
1061  // As a double.
1062  std::pair<std::vector<double>,std::vector<int>> flush_to_vector_dbl(const std::vector<PPIDpair>& order)
1063  {
1064  std::vector<double> out_values;
1065  std::vector<int> out_valid;
1066  for(auto it=order.begin(); it!=order.end(); ++it)
1067  {
1068  if(buffer_set.find(*it)!=buffer_set.end())
1069  {
1070  // Add to output vector
1071  out_values.push_back((double)buffer.at(*it));
1072  out_valid .push_back(buffer_valid.at(*it));
1073  // Remove from buffer
1074  buffer_set .erase(*it);
1075  buffer .erase(*it);
1076  buffer_valid.erase(*it);
1077  }
1078  else
1079  {
1080  out_values.push_back(0);
1081  out_valid .push_back(0);
1082  }
1083  }
1084  return std::make_pair(out_values,out_valid);
1085  }
1086 
1087  // int version
1088  std::pair<std::vector<long>,std::vector<int>> flush_to_vector_int(const std::vector<PPIDpair>& order)
1089  {
1090  std::vector<long> out_values;
1091  std::vector<int> out_valid;
1092  for(auto it=order.begin(); it!=order.end(); ++it)
1093  {
1094  if(buffer_set.find(*it)!=buffer_set.end())
1095  {
1096  // Add to output vector
1097  out_values.push_back((long)buffer.at(*it));
1098  out_valid .push_back(buffer_valid.at(*it));
1099  // Remove from buffer
1100  buffer_set .erase(*it);
1101  buffer .erase(*it);
1102  buffer_valid.erase(*it);
1103  }
1104  else
1105  {
1106  out_values.push_back(0);
1107  out_valid .push_back(0);
1108  }
1109  }
1110  return std::make_pair(out_values,out_valid);
1111  }
1112 
1114  int get_type_id() const
1115  {
1116  return my_dataset.get_type_id();
1117  }
1118 
1119  private:
1120 
1124 
1126  std::map<PPIDpair,T> buffer;
1127 
1128 #ifdef WITH_MPI
1129  // Gambit MPI communicator context for use within the hdf5 printer system
1130  GMPI::Comm& myComm;
1131 #endif
1132  };
1133 
1135  template<class T>
1137  {
1138  public:
1139 
1142 #ifdef WITH_MPI
1143  , GMPI::Comm& comm
1144 #endif
1145  ) : synchronised(sync)
1146 #ifdef WITH_MPI
1147  , myComm(comm)
1148 #endif
1149  {}
1150 
1152  // Currently buffered points need to be supplied in case we have to create and fill a new buffer
1153  HDF5Buffer<T>& get_buffer(const std::string& label, const std::vector<PPIDpair>& buffered_points)
1154  {
1155  auto it=my_buffers.find(label);
1156  if(it==my_buffers.end())
1157  {
1158  // No buffer with this name. Need to create one!
1159  my_buffers.emplace(label,HDF5Buffer<T>(label,synchronised,buffered_points
1160 #ifdef WITH_MPI
1161  , myComm
1162 #endif
1163  ));
1164  it=my_buffers.find(label);
1165  }
1166  return it->second;
1167  }
1168 
1169  private:
1170 
1171  std::map<std::string,HDF5Buffer<T>> my_buffers;
1173 #ifdef WITH_MPI
1174  // Gambit MPI communicator context for use within the hdf5 printer system
1175  GMPI::Comm& myComm;
1176 #endif
1177  };
1178 
1182  {
1183 
1184  public:
1185 
1187  HDF5MasterBuffer(const std::string& filename, const std::string& groupname, const bool sync, const std::size_t buffer_length
1188 #ifdef WITH_MPI
1189  , GMPI::Comm& comm
1190 #endif
1191  );
1192 
1194  ~HDF5MasterBuffer();
1195 
1197  template<class T>
1198  void schedule_print(T const& value, const std::string& label, const unsigned int mpirank, const unsigned long pointID)
1199  {
1201  PPIDpair thispoint(pointID,mpirank);
1202  auto it = buffered_points_set.find(thispoint);
1203  if(it==buffered_points_set.end())
1204  {
1206  if(buffered_points.size() != buffered_points_set.size())
1207  {
1208  std::stringstream msg;
1209  msg<<"Inconsistency detected between buffered_points and buffered_points_set sizes ("<<buffered_points.size()<<" vs "<<buffered_points_set.size()<<")! This is a bug, please report it."<<std::endl;
1210  printer_error().raise(LOCAL_INFO,msg.str());
1211  }
1212 
1214  if(is_synchronised() and buffered_points.size()>get_buffer_length())
1215  {
1217  std::stringstream msg;
1218  msg<<"The allowed sync buffer size has somehow been exceeded! Buffers should have been flushed when they were full. This is a bug, please report it.";
1219  printer_error().raise(LOCAL_INFO,msg.str());
1220  }
1221  else if(buffered_points.size()==get_buffer_length())
1222  {
1223  // Buffer full, flush it out
1224  flush();
1225  }
1226  else if(not is_synchronised() and buffered_points.size()>get_buffer_length())
1227  {
1229 
1231  if((buffered_points.size()%1000)==0)
1232  {
1233  flush();
1234 
1235  std::stringstream msg;
1236  msg<<"The number of unflushable points in the non-synchronised print buffers is getting large (current buffer length is "<<buffered_points.size()<<"; soft max limit was "<<get_buffer_length()<<"). This may indicate that some process has not been properly printing the synchronised points that it is computing. If nothing changes this process may run out of RAM for the printer buffers and crash.";
1237  printer_warning().raise(LOCAL_INFO,msg.str());
1238  }
1239  }
1240 
1241  // Inform all buffers of this new point
1242  update_all_buffers(thispoint);
1243  // DEBUG
1244  //std::cout<<"Adding point to buffered_points list: "<<thispoint<<std::endl;
1245  buffered_points.push_back(thispoint);
1246  buffered_points_set.insert(thispoint);
1247  }
1248 
1249  // Add the new data to the buffer
1250  get_buffer<T>(label,buffered_points).append(value,thispoint);
1251  }
1252 
1254  void flush();
1255 
1256  #ifdef WITH_MPI
1257  void MPI_flush_to_rank(const unsigned int rank);
1260 
1262  void MPI_request_buffer_data(const unsigned int rank);
1263 
1265  void MPI_recv_all_buffers(const unsigned int rank);
1266 
1269  template<class T>
1270  int MPI_recv_buffer(const unsigned int r, const std::string& dset_name)
1271  {
1272  // Get number of points to be received
1273  MPI_Status status;
1274  myComm.Probe(r, h5v2_bufdata_values, &status);
1275  int Npoints;
1276  int err = MPI_Get_count(&status, GMPI::get_mpi_data_type<T>::type(), &Npoints);
1277  if(err<0)
1278  {
1279  std::stringstream msg;
1280  msg<<"Error from MPI_Get_count while attempting to receive buffer data from rank "<<r<<" for dataset "<<dset_name<<"!";
1281  printer_error().raise(LOCAL_INFO,msg.str());
1282  }
1283  HDF5Buffer<T>& buffer = get_buffer<T>(dset_name, buffered_points);
1284  //std::cout<<"(rank "<<myComm.Get_rank()<<") Npoints: "<<Npoints<<std::endl;
1285  buffer.MPI_recv_from_rank(r, Npoints);
1286  logger()<< LogTags::printers << LogTags::debug << "Received "<<Npoints<<" points from rank "<<r<<"'s buffers (for dataset: "<<dset_name<<")"<<EOM;
1287  //std::cout<<"(rank "<<myComm.Get_rank()<<") Received "<<Npoints<<" from rank "<<r<<". New buffer size is "<<buffer.N_items_in_buffer()<<" (name="<<buffer.dset_name()<<")"<<std::endl;
1288  return Npoints;
1289  }
1290 
1292  template<class T>
1293  void MPI_add_int_block_to_buffer(const HDF5bufferchunk& chunk, const std::string& dset_name, const std::size_t dset_index)
1294  {
1295  HDF5Buffer<T>& buffer = get_buffer<T>(dset_name, buffered_points);
1296  buffer.add_int_block(chunk,dset_index);
1297  }
1298 
1299  template<class T>
1300  void MPI_add_float_block_to_buffer(const HDF5bufferchunk& chunk, const std::string& dset_name, const std::size_t dset_index)
1301  {
1302  HDF5Buffer<T>& buffer = get_buffer<T>(dset_name, buffered_points);
1303  buffer.add_float_block(chunk,dset_index);
1304  }
1305 
1306  // Add a vector of buffer chunk data to the buffers managed by this object
1307  void add_to_buffers(const std::vector<HDF5bufferchunk>& blocks, const std::vector<std::pair<std::string,int>>& buf_types);
1308 
1309  #endif
1310 
1312  void reset();
1313 
1315  void resynchronise();
1316 
1318  bool all_buffers_empty();
1319 
1321  bool is_synchronised();
1322 
1324  std::string buffer_status();
1325 
1327  std::string get_file();
1328 
1330  std::string get_group();
1331 
1333  std::size_t get_buffer_length();
1334 
1336  std::size_t get_Npoints();
1337 
1339  void extend_all_datasets_to(const std::size_t length);
1340 
1342  std::map<ulong, ulong> get_highest_PPIDs(const int mpisize);
1343 
1345  void lock_and_open_file(const char access_type='w'); // read/write allowed by default
1346 
1348  void close_and_unlock_file();
1349 
1351  hid_t get_location_id();
1352 
1354  std::size_t get_next_free_position();
1355 
1357  std::size_t get_Nbuffers();
1358 
1360  double get_sizeMB();
1361 
1363  std::vector<std::pair<std::string,int>> get_all_dset_names_on_disk();
1364 
1366  const std::map<std::string,HDF5BufferBase*>& get_all_buffers();
1367 
1369  const std::set<PPIDpair>& get_all_points();
1370 
1372  // (only intended to be used when points have been removed from buffers by e.g. MPI-related
1373  // routines like flush_to_vector)
1374  void untrack_points(const std::set<PPIDpair>& removed_points);
1375 
1376  private:
1377 
1379  std::map<std::string,HDF5BufferBase*> all_buffers;
1380 
1385  std::vector<PPIDpair> buffered_points;
1386 
1389  std::set<PPIDpair> buffered_points_set;
1390 
1393 
1395  std::size_t buffer_length;
1396 
1398  template<class T>
1399  HDF5Buffer<T>& get_buffer(const std::string& label, const std::vector<PPIDpair>& buffered_points);
1400 
1402  void update_buffer_map(const std::string& label, HDF5BufferBase& buff);
1403 
1407  void update_all_buffers(const PPIDpair& ppid);
1408 
1410  std::map<PPIDpair,std::size_t> get_position_map(const std::vector<PPIDpair>& buffer) const;
1411 
1413  std::string file; // Output HDF5 file
1414  std::string group; // HDF5 group location to store datasets
1415 
1416  // Handles for HDF5 files and groups containing the datasets
1419 
1420  // Handle to a location in a HDF5 to which the datasets will be written
1421  // i.e. a file or a group.
1423 
1425  Utils::FileLock hdf5out;
1426 
1427  // Flag to register whether HDF5 file is open
1429 
1430  // Flag to register whether HDF5 file is locked for us to use
1432 
1434  void ensure_file_is_open() const;
1435 
1437  // Need a map for every directly printable type, and a specialisation
1438  // of 'get_buffer' to access it.
1443  //HDF5MasterBufferT<longlong > hdf5_buffers_longlong;
1444  //HDF5MasterBufferT<ulonglong> hdf5_buffers_ulonglong;
1447 
1448 #ifdef WITH_MPI
1449  // Gambit MPI communicator context for use within the hdf5 printer system
1450  GMPI::Comm& myComm;
1451 #endif
1452 
1453  };
1454 
1456  template<> HDF5Buffer<int >& HDF5MasterBuffer::get_buffer<int >(const std::string& label, const std::vector<PPIDpair>&);
1457  template<> HDF5Buffer<uint >& HDF5MasterBuffer::get_buffer<uint >(const std::string& label, const std::vector<PPIDpair>&);
1458  template<> HDF5Buffer<long >& HDF5MasterBuffer::get_buffer<long >(const std::string& label, const std::vector<PPIDpair>&);
1459  template<> HDF5Buffer<ulong >& HDF5MasterBuffer::get_buffer<ulong >(const std::string& label, const std::vector<PPIDpair>&);
1460  //template<> HDF5Buffer<longlong >& HDF5MasterBuffer::get_buffer<longlong >(const std::string& label, const std::vector<PPIDpair>&);
1461  //template<> HDF5Buffer<ulonglong>& HDF5MasterBuffer::get_buffer<ulonglong>(const std::string& label, const std::vector<PPIDpair>&);
1462  template<> HDF5Buffer<float >& HDF5MasterBuffer::get_buffer<float >(const std::string& label, const std::vector<PPIDpair>&);
1463  template<> HDF5Buffer<double >& HDF5MasterBuffer::get_buffer<double >(const std::string& label, const std::vector<PPIDpair>&);
1464 
1466  class HDF5Printer2: public BasePrinter
1467  {
1468  public:
1470  HDF5Printer2(const Options& options, BasePrinter* const primary = NULL);
1471 
1473  ~HDF5Printer2();
1474 
1476  std::string get_filename();
1477 
1479  std::string get_groupname();
1480 
1482  std::size_t get_buffer_length();
1483 
1487 
1488  // Initialisation function
1489  // Run by dependency resolver, which supplies the functors with a vector of VertexIDs whose requiresPrinting flags are set to true.
1490  void initialise(const std::vector<int>&);
1491  void flush();
1492  void reset(bool force=false);
1493  void finalise(bool abnormal=false);
1494 
1495  // Get options required to construct a reader object that can read
1496  // the previous output of this printer.
1497  Options resume_reader_options();
1498 
1500 
1502  using BasePrinter::_print; // Tell compiler we are using some of the base class overloads of this on purpose.
1503  #define DECLARE_PRINT(r,data,i,elem) void _print(elem const&, const std::string&, const int, const uint, const ulong);
1505  #ifndef SCANNER_STANDALONE
1507  #endif
1508  #undef DECLARE_PRINT
1509 
1512  void add_aux_buffer(HDF5MasterBuffer& aux_buffermaster);
1513 
1516  HDF5Printer2* get_HDF5_primary_printer();
1517 
1518 #ifdef WITH_MPI
1519  GMPI::Comm& get_Comm();
1521 #endif
1522 
1523  private:
1524 
1526  HDF5Printer2* link_primary_printer(BasePrinter* const primary);
1527 
1530 
1531  std::size_t myRank;
1532  std::size_t mpiSize;
1533 
1534 #ifdef WITH_MPI
1535  GMPI::Comm myComm; // initially attaches to MPI_COMM_WORLD
1537 
1539  std::pair<std::map<std::string,int>,std::vector<std::pair<std::string,int>>> get_buffer_idcodes(const std::vector<HDF5MasterBuffer*>& masterbuffers);
1540 
1542  void gather_and_print(HDF5MasterBuffer& out_printbuffer, const std::vector<HDF5MasterBuffer*>& masterbuffers, bool sync);
1543 
1544  // Gather (via MPI) all HDF5 buffer chunk data from a set of managed buffers
1545  std::vector<HDF5bufferchunk> gather_all(GMPI::Comm& comm, const std::vector<HDF5MasterBuffer*>& masterbuffers, const std::map<std::string,int>& buf_ids);
1546 
1547  static constexpr double RAMlimit = 500; // MB; dump data if buffer size exceeds this
1548  static constexpr std::size_t MAXrecv = 100; // Maximum number of processes to send buffer data at one time
1549 
1550 #endif
1551 
1554 
1557  std::vector<HDF5MasterBuffer*> aux_buffers;
1558 
1560  std::string get_filename(const Options& options);
1561 
1563  std::string get_groupname(const Options& options);
1564 
1566  std::size_t get_buffer_length(const Options& options);
1567 
1569  std::map<ulong, ulong> get_highest_PPIDs_from_HDF5();
1570 
1573  void check_consistency(bool attempt_repair);
1574 
1576  bool get_sync(const Options& options);
1577 
1579  // Used to reduce repetition in definitions of virtual function overloads
1580  // (useful since there is no automatic type conversion possible)
1581  template<class T>
1582  void basic_print(T const& value, const std::string& label, const unsigned int mpirank, const unsigned long pointID)
1583  {
1584  // Forward the print information on to the master buffer manager object
1585  buffermaster.schedule_print<T>(value,label,mpirank,pointID);
1586  }
1587 
1588  };
1589 
1590  // Register printer so it can be constructed via inifile instructions
1591  // First argument is string label for inifile access, second is class from which to construct printer
1592  LOAD_PRINTER(hdf5, HDF5Printer2)
1593 
1594  }
1595 }
1596 
1597 #endif
void ensure_dataset_is_open() const
Enforce that the dataset must be open for whatever follows (or else an error is thrown) ...
void schedule_print(T const &value, const std::string &label, const unsigned int mpirank, const unsigned long pointID)
Queue up data to be written to disk when buffers are full.
std::size_t write_vector(const hid_t loc_id, const std::vector< T > &data, const std::size_t target_pos, const bool force=false)
Write a vector of data to disk at the target position.
void add_int_block(const HDF5bufferchunk &chunk, const std::size_t buf)
Class to manage buffer for a single output label.
greatScanData data
Definition: great.cpp:38
HDF5MasterBufferT(bool sync)
Constructor.
The main printer class for output to HDF5 format.
void close_dataset()
Close dataset on disk and release handles.
void reset(hid_t loc_id)
Clear all data in the buffer and on disk Only allowed for "random access" buffers.
EXPORT_SYMBOLS error & printer_error()
Printer errors.
HDF5Printer2 * primary_printer
Pointer to primary printer object.
void basic_print(T const &value, const std::string &label, const unsigned int mpirank, const unsigned long pointID)
Helper print function.
void open_dataset(hid_t location_id)
Open dataset on disk and obtain HDF5 handles.
HDF5DataSet< int > my_dataset_valid
std::string myname() const
Retrieve name of the dataset we are supposed to access.
std::vector< T > get_chunk(std::size_t offset, std::size_t length) const
Extract a data slice from the linked dataset.
HDF5MasterBufferT< int > hdf5_buffers_int
Buffer manager objects.
const int h5v2_bufdata_values(15)
void add_float_block(const HDF5bufferchunk &chunk, const std::size_t buf)
#define LOCAL_INFO
Definition: local_info.hpp:34
GAMBIT file locking functions Use these to block access to sensitive parts of the code by other proce...
bool get_exists_on_disk() const
Variable tracking whether the dataset is known to exist in the output file yet.
virtual void create_dataset(hid_t location_id)=0
Create a new dataset at the specified location (implemented in derived class since need to know the t...
std::map< std::string, HDF5BufferBase * > all_buffers
Map containing pointers to all buffers managed by this object;.
Constructable class for doing basic operations on a HDF5 dataset.
std::pair< hid_t, hid_t > select_hyperslab(std::size_t offset, std::size_t length) const
Obtain memory and dataspace identifiers for writing to a hyperslab in the dataset.
HDF5DataSet< T > my_dataset
Object that provides an interface to the output HDF5 dataset matching this buffer.
HDF5DataSetBase(const std::string &name, const hid_t hdftype_id)
HDF5DataSetBase member functions.
Logging access header for GAMBIT.
HDF5MasterBuffer buffermaster
Object interfacing to HDF5 file and all datasets.
BOOST_PP_SEQ_FOR_EACH_I(GETTYPEID, _, PRINTABLE_TYPES) void printAllTypeIDs(void)
For debugging; print to stdout all the typeIDs for all types.
Definition: baseprinter.cpp:36
Definitions of new MPI datatypes needed by printers.
unsigned long long int ulonglong
const int h5v2_bufdata_type(14)
Declarations for the YAML options class.
std::pair< std::vector< long >, std::vector< int > > flush_to_vector_int(const std::vector< PPIDpair > &order)
HDF5Buffer(const std::string &name, const bool sync, const std::vector< PPIDpair > &buffered_points)
Constructor.
HDF5MasterBufferT< ulong > hdf5_buffers_ulong
const int h5v2_bufdata_points(11)
HDF5MasterBufferT< uint > hdf5_buffers_uint
std::set< PPIDpair > buffer_set
Set detailing what points are in the buffer.
hid_t get_hdftype_id() const
Retrieve the HDF5 type ID for this dataset.
std::size_t get_dset_length() const
Retrieve the current size of the dataset on disk.
void random_flush(const hid_t loc_id, const std::map< PPIDpair, std::size_t > &position_map)
Empty the buffer to disk as "random access" data at pre-existing positions matching the point IDs May...
bool dataset_exists(const hid_t loc_id)
Check if our dataset exists on disk with the required name at the given location. ...
Class for interfacing to a HDF5 dataset of fixed type.
std::set< PPIDpair > buffered_points_set
We also need a set of the buffered points, so we can do fast lookup of whether a point is in the buff...
void ensure_dataset_exists(const hid_t loc_id, const std::size_t length)
Make sure datasets exist on disk with the correct name and size.
const int h5v2_bufdata_ranks(12)
hid_t hdftype_id
HDF5 type ID for this dataset.
HDF5DataSet(const std::string &name)
Constructor.
HDF5Buffer< T > & get_buffer(const std::string &label, const std::vector< PPIDpair > &buffered_points)
Retrieve buffer of our type for a given label.
void update(const PPIDpair &ppid)
Make sure buffer includes the specified point (data will be set as &#39;invalid&#39; unless given elsewhere) ...
std::vector< HDF5MasterBuffer * > aux_buffers
Vector of pointers to master buffer objects for auxilliary printers Only the primary printer will hav...
void append(T const &value, const PPIDpair &ppid)
Insert data to print buffer at the specified point (overwrite if it already exists in the buffer) ...
std::pair< std::vector< double >, std::vector< int > > flush_to_vector_dbl(const std::vector< PPIDpair > &order)
const Logging::endofmessage EOM
Explicit const instance of the end of message struct in Gambit namespace.
Definition: logger.hpp:99
const int h5v2_BLOCK(30)
HDF5MasterBufferT< long > hdf5_buffers_long
bool synchronised
Flag to specify what sort of buffer this manager is supposed to be managing.
std::map< PPIDpair, int > buffer_valid
Buffer specifying whether the data in the primary buffer is "valid".
Utils::FileLock hdf5out
File locking object for the output hdf5 file.
Logging::LogMaster & logger()
Function to retrieve a reference to the Gambit global log object.
Definition: logger.cpp:95
const int h5v2_bufdata_valid(13)
const int h5v2_bufname(10)
MPI tags for HDF5 printer v2.
Declaration and definition of printer base class.
START_MODEL dNur_CMB r
void block_flush(const hid_t loc_id, const std::vector< PPIDpair > &order, const std::size_t target_pos)
Empty the buffer to disk as block with the specified order into the target position (only allowed if ...
bool is_open
Flag to let us known if the dataset is open.
HDF5MasterBufferT< float > hdf5_buffers_float
std::vector< PPIDpair > buffered_points
Vector of PPIDpairs that are currently stored in the printer buffers This also defines the order in w...
int get_type_id() const
Retrieve the integer type ID for the buffered dataset.
bool synchronised
Flag to tell us whether this buffer should perform block writes to the output dataset, or look up and overwrite existing points.
const int h5v2_BEGIN(31)
#define LOAD_PRINTER(tag,...)
Definition: baseprinter.hpp:57
void create_dataset(hid_t location_id)
Create a new dataset at the specified location.
int type_id
Integer identifier for the template type of this dataset (determined by derived type) ...
long long values_int[NBUFFERS][SIZE]
unsigned long int ulong
std::map< std::string, HDF5Buffer< T > > my_buffers
A simple C++ wrapper for the MPI C bindings.
Base class for buffers.
bool exists_on_disk
Variable tracking whether the dataset is known to exist in the output file yet.
std::string _dset_name
Name of dataset for which this object is the buffer.
EXPORT_SYMBOLS warning & printer_warning()
Printer warnings.
void write_RA_buffer(const T(&buffer)[MAX_BUFFER_SIZE], const hsize_t(&coords)[MAX_BUFFER_SIZE], std::size_t npoints)
Write a buffer of data to disk at the specified positions (must be within current dataset extents) ...
std::string file
Output file variales.
std::string _myname
Name of the dataset in the hdf5 file.
Class to manage a set of buffers for a single output type.
bool exists_on_disk() const
Report whether the dataset for which we are the buffer exists on disk yet.
hid_t dset_id
HDF5 dataset identifer.
std::set< T > set_diff(const std::set< T > &set1, const std::set< T > &set2)
std::map< PPIDpair, T > buffer
Buffer containing points to be written to disk upon "flush".
virtual ~HDF5BufferBase()
Destructor.
hid_t get_dset_id() const
Retrieve the dataset ID for the currently open dataset.
#define DECLARE_PRINT(r, data, i, elem)
Sequence of all types printable by the HDF5 printer.
void ensure_dataset_exists(const hid_t loc_id, const std::size_t length)
Ensure that a correctly named dataset exists at the target location with the specified length...
void reset(hid_t loc_id)
Clear all data on disk for this dataset Note; this just sets all values to defaults, it doesn&#39;t delete or resize the dataset.
void write_random(const hid_t loc_id, const std::map< std::size_t, T > &data)
Write data to disk at specified positions.
Base class for interfacing to a HDF5 dataset.
void write_buffer(const T(&buffer)[MAX_BUFFER_SIZE], const std::size_t length, const std::size_t target_pos, const bool force=false)
Write a block of data to disk at the end of the dataset This is the lower-level function.
A collection of tools for interacting with HDF5 databases.
#define HDF5_TYPES
Definition: hdf5types.hpp:25
#define HDF5_BACKEND_TYPES
Definition: hdf5types.hpp:41
Base template is left undefined in order to raise a compile error if specialisation doesn&#39;t exist...
Definition: hdf5tools.hpp:77
pointID / process number pair Used to identify a single parameter space point
TODO: see if we can use this one:
Definition: Analysis.hpp:33
A small wrapper object for &#39;options&#39; nodes.
Class to manage all buffers for a given printer object Also handles the file locking/access to the ou...
HDF5MasterBufferT< double > hdf5_buffers_double
virtual ~HDF5DataSetBase()
Destructor.
void extend_dset_to(const std::size_t new_size)
Extend dataset to the specified size, filling it with default values.
long long int longlong
std::size_t virtual_dset_length
Variable tracking size of dataset on disk.
Concatenation macros.
int get_type_id() const
Retrieve the integer type ID for this dataset.
std::size_t buffer_length
Max allowed size of buffer.
void extend_dset_by(const std::size_t extend_by)
Set the variable that tracks the (virtual) dataset size on disk.