gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-2191-ga4742ac
a Global And Modular Bsm Inference Tool
Gambit::PostProcessor Namespace Reference

Classes

class  PPDriver
 Driver class to handle the actual postprocessing tasks. More...
 
struct  PPOptions
 Options object for PPDriver See matching options in PPDriver class for description. More...
 

Functions

bool point_done (const ChunkSet done_chunks, size_t index)
 Helper functions for performing resume related tasks. More...
 
Chunk get_effective_chunk (const std::size_t total_length, const unsigned int rank, const unsigned int numtasks)
 Get 'effective' start and end positions for a processing batch i.e. More...
 
Chunk get_my_chunk (const std::size_t dset_length, const ChunkSet &done_chunks, const int rank, const int numtasks)
 Compute start/end indices for a given rank process, given previous "done_chunk" data. More...
 
ChunkSet get_done_points (const std::string &filebase)
 Read through resume data files and reconstruct which chunks of points have already been processed. More...
 
ChunkSet merge_chunks (const ChunkSet &)
 Simplify a ChunkSet by merging chunks which overlap. More...
 
void record_done_points (const ChunkSet &done_chunks, const Chunk &mydone, const std::string &filebase, unsigned int rank, unsigned int size)
 Write resume data files These specify which chunks of points have been processed during this run. More...
 
const Chunk stopchunk = Chunk(0,0)
 
ChunkSet get_done_points (Gambit::Printers::BaseBaseReader &filebase)
 Read through resume data files and reconstruct which chunks of points have already been processed. More...
 

Function Documentation

◆ get_done_points() [1/2]

ChunkSet Gambit::PostProcessor::get_done_points ( const std::string &  filebase)

Read through resume data files and reconstruct which chunks of points have already been processed.

Definition at line 254 of file postprocessor_object.cpp.

References Chunk::end, Gambit::LogTags::err, Gambit::Utils::file_exists(), combine_hdf5::fin, LOCAL_INFO, merge_chunks(), Gambit::Scanner::scan_error(), and Chunk::start.

Referenced by scanner_plugin().

255  {
256  ChunkSet done_chunks;
257 
258  // First read collated chunk data from past resumes, and the number of processes used in the last run
259  std::string inprev = filebase+"_prev.dat";
260 
261  // Check if it exists (it will not exist on the first resume)
262  if(Utils::file_exists(inprev))
263  {
264  std::ifstream finprev(inprev);
265  if(finprev)
266  {
267  unsigned int prev_size;
268  finprev >> prev_size;
269  Chunk nextchunk;
270  while( finprev >> nextchunk.start >> nextchunk.end )
271  {
272  done_chunks.insert(nextchunk);
273  }
274 
275  // Now read each of the chunk files left by each process during previous run
276  for(unsigned int i=0; i<prev_size; ++i)
277  {
278  std::ostringstream inname;
279  inname << filebase << "_" << i << ".dat";
280  std::string in = inname.str();
281  if(Utils::file_exists(in))
282  {
283  std::ifstream fin(in);
284  if(fin)
285  {
286  fin >> nextchunk.start >> nextchunk.end;
287  done_chunks.insert(nextchunk);
288  }
289  else
290  {
291  std::ostringstream err;
292  err << "Tried to read postprocessor resume data from "<<in<<" but encountered a read error of some kind (the file seems to exist but appears to be unreadable";
293  Scanner::scan_error().raise(LOCAL_INFO,err.str());
294  }
295  }
296  else
297  {
298  std::ostringstream err;
299  err << "Tried to read postprocessor resume data from "<<in<<" but the file does not exist or is unreadable. We require this file because according to "<<inprev<<" there were "<<prev_size<<" processes in use during the last run, and we require the resume data from all of them";
300  Scanner::scan_error().raise(LOCAL_INFO,err.str());
301  }
302  }
303  }
304  else
305  {
306  std::ostringstream err;
307  err << "Tried to read postprocessor resume data from "<<inprev<<" but encountered a read error of some kind (the file seems to exist but appears to be unreadable";
308  Scanner::scan_error().raise(LOCAL_INFO,err.str());
309  }
310  }
311  // Else there is no resume data, assume that this is a new run started without the --restart flag.
312  return merge_chunks(done_chunks); // Simplify the chunks and return them
313  }
Struct to describe start and end indices for a chunk of data.
Definition: chunks.hpp:21
std::set< Chunk, ChunkLess > ChunkSet
Definition: chunks.hpp:69
#define LOCAL_INFO
Definition: local_info.hpp:34
std::size_t end
Definition: chunks.hpp:24
EXPORT_SYMBOLS bool file_exists(const std::string &filename)
Check if a file exists.
dictionary fin
ChunkSet merge_chunks(const ChunkSet &)
Simplify a ChunkSet by merging chunks which overlap.
std::size_t start
Definition: chunks.hpp:23
EXPORT_SYMBOLS error & scan_error()
Scanner errors.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ get_done_points() [2/2]

ChunkSet Gambit::PostProcessor::get_done_points ( Gambit::Printers::BaseBaseReader filebase)

Read through resume data files and reconstruct which chunks of points have already been processed.

Read through any pre-existing output and reconstruct which chunks of points have already been processed.

Definition at line 79 of file postprocessor_object.cpp.

References Gambit::PostProcessor::PPOptions::add_to_logl, Gambit::PostProcessor::PPDriver::add_to_logl, Gambit::PostProcessor::PPOptions::all_params, Gambit::PostProcessor::PPDriver::all_params, Gambit::PostProcessor::PPDriver::check_settings(), combine_hdf5::chunksize, Gambit::PostProcessor::PPOptions::chunksize, Gambit::PostProcessor::PPOptions::cut_greater_than, Gambit::PostProcessor::PPDriver::cut_greater_than, Gambit::PostProcessor::PPOptions::cut_less_than, Gambit::PostProcessor::PPDriver::cut_less_than, Gambit::PostProcessor::PPOptions::data_labels, Gambit::PostProcessor::PPDriver::data_labels, Gambit::PostProcessor::PPOptions::data_labels_copy, Gambit::PostProcessor::PPDriver::data_labels_copy, Gambit::Utils::delimiterSplit(), Gambit::PostProcessor::PPOptions::discard_old_logl, Gambit::PostProcessor::PPDriver::discard_old_logl, Gambit::PostProcessor::PPOptions::discard_points_outside_cuts, Chunk::end, Gambit::Printers::BaseBaseReader::eoi(), Gambit::LogTags::err, Gambit::Printers::BaseBaseReader::get_next_point(), Gambit::Printers::BaseBaseReader::get_type(), Gambit::PostProcessor::PPDriver::getLogLike(), Gambit::PostProcessor::PPDriver::getPrinter(), Gambit::PostProcessor::PPDriver::getReader(), LOCAL_INFO, Gambit::PostProcessor::PPOptions::logl_purpose_name, Gambit::PostProcessor::PPDriver::logl_purpose_name, Gambit::PostProcessor::PPDriver::LogLike, merge_chunks(), Gambit::PostProcessor::PPDriver::new_params, Gambit::PostProcessor::PPDriver::PPDriver(), Gambit::PostProcessor::PPDriver::printer, Gambit::PostProcessor::PPOptions::rank, combine_hdf5::rank, Gambit::PostProcessor::PPDriver::reader, Gambit::PostProcessor::PPOptions::renaming_scheme, Gambit::PostProcessor::PPDriver::renaming_scheme, Gambit::Printers::BaseBaseReader::retrieve(), Gambit::PostProcessor::PPOptions::reweighted_loglike_name, Gambit::PostProcessor::PPDriver::reweighted_loglike_name, Gambit::PostProcessor::PPOptions::root, Gambit::Scanner::scan_error(), Chunk::start, Gambit::Utils::startsWith(), Gambit::PostProcessor::PPOptions::subtract_from_logl, Gambit::PostProcessor::PPDriver::subtract_from_logl, Gambit::PostProcessor::PPOptions::update_interval, Gambit::Logging::verbose, and Gambit::PostProcessor::PPOptions::verbose.

80  {
81  ChunkSet done_chunks;
82 
83  // Need to iterate through the pre-existing output and figure out what points it
84  // has processed. We cannot tell what points were purposefully skipped (if the user
85  // chose not to copy them into the output), but that shouldn't be a big deal since deciding
86  // to skip a point doesn't cost much CPU, so we can just do it again.
87 
88  // We build up the set of "done" points as chunks.
89 
90  std::size_t previous_index = 0;
91  bool building_chunk = false;
92  std::size_t chunk_start;
93  std::size_t chunk_end;
94  while(not resume_reader.eoi()) // while not end of input
95  {
96  std::size_t input_index;
97  bool is_valid = resume_reader.retrieve(input_index, "input_dataset_index");
98 
99  if(is_valid)
100  {
101  if(not building_chunk)
102  {
103  // Not building a chunk, and this point is valid, so start new (will be the first) chunk
104  building_chunk = true;
105  chunk_start = input_index;
106  }
107  else if(input_index==(previous_index+1))
108  {
109  // Point is just an increment by one, so still part of this chunk
110  // Do nothing.
111  }
112  else if(input_index==previous_index)
113  {
114  // Reader didn't progress, error.
115  std::ostringstream err;
116  err << "'resume_reader' object returned the same value for 'input_dataset_index' twice ('"<<input_index<<"')! This means that it either didn't increment properly during this postprocessor run, or the input dataset contains the same point twice! Either case indicates a bug in the postprocessor, please report it.";
117  Scanner::scan_error().raise(LOCAL_INFO,err.str());
118  }
119  else
120  {
121  // Non-incremental change in input_index! Could be higher or lower, either way, we
122  // close the previous chunk and start a new one.
123  chunk_end = previous_index;
124  done_chunks.insert(Chunk(chunk_start,chunk_end));
125  chunk_start = input_index;
126  }
127 
128  previous_index = input_index;
129  }
130 
131  resume_reader.get_next_point(); // Move reader to next previously processed point
132  }
133  // Need to close off last chunk
134  if(building_chunk)
135  {
136  chunk_end = previous_index;
137  done_chunks.insert(Chunk(chunk_start,chunk_end));
138  }
139 
140  return merge_chunks(done_chunks); // Simplify the chunks and return them
141  }
Struct to describe start and end indices for a chunk of data.
Definition: chunks.hpp:21
std::set< Chunk, ChunkLess > ChunkSet
Definition: chunks.hpp:69
#define LOCAL_INFO
Definition: local_info.hpp:34
ChunkSet merge_chunks(const ChunkSet &)
Simplify a ChunkSet by merging chunks which overlap.
EXPORT_SYMBOLS error & scan_error()
Scanner errors.
Here is the call graph for this function:

◆ get_effective_chunk()

Chunk Gambit::PostProcessor::get_effective_chunk ( const std::size_t  total_length,
const unsigned int  rank,
const unsigned int  numtasks 
)

Get 'effective' start and end positions for a processing batch i.e.

simply divides up an integer into the most even parts possible over a given number of processes

Definition at line 52 of file postprocessor_object.cpp.

References r, and combine_hdf5::rank.

Referenced by get_my_chunk().

53  {
54  // Compute which points this process is supposed to process. Divide total
55  // by number of MPI tasks.
56  unsigned long long my_length = total_length / numtasks;
57  unsigned long long r = total_length % numtasks;
58  // Offset from beginning for this task assuming equal lengths in each task
59  unsigned long long start = my_length * rank;
60  // Divide up the remainder amongst the tasks and adjust offsets to account for these
61  if(rank<r)
62  {
63  my_length++;
64  start+=rank;
65  }
66  else
67  {
68  start+=r;
69  }
70  unsigned long long end = start + my_length - 1; // Minus 1 for the zero indexing
71  return Chunk(start,end);
72  }
Struct to describe start and end indices for a chunk of data.
Definition: chunks.hpp:21
START_MODEL dNur_CMB r
Here is the caller graph for this function:

◆ get_my_chunk()

Chunk Gambit::PostProcessor::get_my_chunk ( const std::size_t  dset_length,
const ChunkSet done_chunks,
const int  rank,
const int  numtasks 
)

Compute start/end indices for a given rank process, given previous "done_chunk" data.

First compute number of points left to process

First compute number of points left to process

Definition at line 75 of file postprocessor_object.cpp.

References combine_hdf5::dset_length, Chunk::eff_length, Chunk::end, Gambit::LogTags::err, get_effective_chunk(), Chunk::length(), LOCAL_INFO, Gambit::Scanner::scan_error(), and Chunk::start.

Referenced by Gambit::PostProcessor::PPDriver::run_main_loop().

76  {
78  std::size_t left_to_process = 0;
79  std::size_t prev_chunk_end = 0;
80  bool first_chunk = true;
81  for(ChunkSet::const_iterator it=done_chunks.begin();
82  it!=done_chunks.end(); ++it)
83  {
84  // total_done_length += it->length();
85  // Whoops, cannot just add lengths, because done_chunks can overlap. Need to add up the actual gaps
86  // between them
87  long long int gap_size = it->start; // e.g. done_chunk starts at say '5';
88  if(not first_chunk) gap_size -= (prev_chunk_end+1); // e.g. previous chunk finished at '1'; then gap_size is len(2,3,4) = 3 = 5 - 2. Unless no previous chunk, then gap_size is len(0,1,2,3,4) = 5.
89  // std::cout << "Rank "<<rank<<": "<<"examining done_chunk ["<<it->start<<","<<it->end<<"]"<<std::endl;
90  // std::cout << "Rank "<<rank<<": "<<"first? "<<first_chunk<<", prev_chunk_end = "<<prev_chunk_end<<std::endl;
91  // std::cout << "Rank "<<rank<<": "<<"gap_size = " << gap_size <<std::endl;
92  if(gap_size>0)
93  {
94  left_to_process += gap_size;
95  }
96  // Else the new done_chunk started before the previous done_chunk finished,
97  // (they are ordered only based on the start index)
98  // so we can skip it, or rather "merge" their lengths by just updating the
99  // prev_chunk_end location if it has increased.
100  if(first_chunk or it->end > prev_chunk_end)
101  {
102  first_chunk = false;
103  prev_chunk_end = it->end;
104  }
105  //std::cout << "Rank "<<rank<<": "<<"left_to_process = " << left_to_process <<std::endl;
106  }
107  // ...and add gap from last done_chunk to the end of the dataset
108  long long int last_gap_size = dset_length;
109  if(not first_chunk) last_gap_size -= (prev_chunk_end+1); // e.g. dataset ends at 9 (length 10); previous chunk finished at 6; last_gap_size = len(7,8,9) = 3 = 10 - (6+1)
110  //std::cout << "Rank "<<rank<<": "<<"dset_length = " << dset_length <<std::endl;
111  //std::cout << "Rank "<<rank<<": "<<"last_gap_size = " << last_gap_size <<std::endl;
112  if(last_gap_size>0)
113  {
114  left_to_process += last_gap_size;
115  }
116  //std::cout << "Rank "<<rank<<": "<<"left_to_process = " << left_to_process <<std::endl;
117  // Done! Sanity check.
118  if(left_to_process > dset_length)
119  {
120  std::ostringstream err;
121  err << "Rank "<<rank<<" chunk calculation encountered nonsense! Computed number of points left to process ("<<left_to_process<<") is greater than the actual dataset length ("<<dset_length<<")! This is a bug in the postprocessor, please report it." <<std::endl;
122  Scanner::scan_error().raise(LOCAL_INFO,err.str());
123  }
124  // if(rank==0) std::cout << "left_to_process = " << left_to_process;
125  // Get 'effective' start/end positions for this rank; i.e. what the start index would be if the 'done' points were removed.
126  Chunk eff_chunk = get_effective_chunk(left_to_process, rank, numtasks);
127 
128  // Convert effective chunk to real dataset indices (i.e. add in the 'skipped' indices)
129  std::size_t count = 0;
130  Chunk realchunk;
131  realchunk.eff_length = eff_chunk.length(); // Record real number of points that will be processed from this chunk
132  //std::cout << "Rank "<<rank<<": Converting to real dataset indices..." <<std::endl;
133  prev_chunk_end = 0; // Reset
134  first_chunk = true; // Reset
135  bool found_start = false;
136  bool found_end = false;
137  //std::cout << "Rank "<<rank<<": "<<"Computing real dataset indices..." <<std::endl;
138  for(ChunkSet::const_iterator it=done_chunks.begin();
139  it!=done_chunks.end(); ++it)
140  {
141  // Need to add up the size of the gaps between chunks until we exceed the "effective" start/end positions,
142  // then get the real indices by measuring back from the start of the done_chunk we are up to.
143  //std::cout << "Rank "<<rank<<": Getting next done_chunk ["<<it->start<<","<<it->end<<"]"<<std::endl;
144  long long int gap_size = it->start; // e.g. done_chunk starts at say '5';
145  if(not first_chunk) gap_size -= (prev_chunk_end+1); // e.g. previous chunk finished at '1'; then gap_size is len(2,3,4) = 3 = 5 - 2. Unless no previous chunk, then gap_size is len(0,1,2,3,4) = 5.
146  //std::cout << "Rank "<<rank<<": "<<"examining done_chunk ["<<it->start<<","<<it->end<<"]"<<std::endl;
147  //std::cout << "Rank "<<rank<<": "<<"first? "<<first_chunk<<", prev_chunk_end = "<<prev_chunk_end<<std::endl;
148  //std::cout << "Rank "<<rank<<": "<<"gap_size = " << gap_size <<std::endl;
149  if(gap_size>0)
150  {
151  count += gap_size;
152  //std::cout << "Rank "<<rank<<": count = "<<count<<" (added gap of size "<<gap_size<<"; done_chunk.start="<<it->start<<" - prev_chunk_end="<<prev_chunk_end<<")"<<std::endl;
153  //std::cout << "Rank "<<rank<<": "<<"count = " << count <<std::endl;
154  //std::cout << "Rank "<<rank<<": "<<"eff_chunk.start = " << eff_chunk.start <<std::endl;
155  if(not found_start and count > eff_chunk.start)
156  {
157  std::size_t overshoot = count - eff_chunk.start; // If count is 3 and our chunk is supposed to start at the first 'not done' point (index 0), we have overshot by 3 - 0 = 3 positions.
158  realchunk.start = it->start - overshoot; // So our start point is 5 - 3 = 2
159  //std::cout << "Rank "<<rank<<": "<<"start overshoot = " << overshoot <<std::endl;
160  //std::cout << "Rank "<<rank<<": "<<"realchunk.start = " << realchunk.start <<std::endl;
161  //std::cout << "Rank "<<rank<<": found start of chunk! realchunk.start = "<<realchunk.start<<", eff_chunk.start = "<<eff_chunk.start<<", overshoot = "<<overshoot<<std::endl;
162  found_start = true;
163  }
164  if(not found_end and count > eff_chunk.end)
165  {
166  std::size_t overshoot = count - eff_chunk.end; // Suppose our chunk should also end on the first 'not done' point (i.e. we have only one point assigned). Then we have the same calculation as above for the end.
167  realchunk.end = it->start - overshoot;
168  //std::cout << "Rank "<<rank<<": "<<"end overshoot = " << overshoot <<std::endl;
169  //std::cout << "Rank "<<rank<<": "<<"realchunk.end = " << realchunk.end <<std::endl;
170  found_end = true;
171  //std::cout << "Rank "<<rank<<": found end of chunk! realchunk.end = "<<realchunk.end<<", eff_chunk.end = "<<eff_chunk.end<<", overshoot = "<<overshoot<<std::endl;
172  break;
173  }
174  }
175  // Else the new done_chunk started before the previous done_chunk finished,
176  // (they are ordered only based on the start index)
177  // so we can skip it, or rather "merge" their lengths by just updating the
178  // prev_chunk_end location if it has increased.
179  if(first_chunk or it->end > prev_chunk_end)
180  {
181  first_chunk = false;
182  prev_chunk_end = it->end;
183  }
184 
185  //std::cout << "Rank "<<rank<<": set prev_chunk_end to "<<prev_chunk_end<<std::endl;
186  }
187  // If the chunk we need to process starts or finishes after the last done chunk,
188  // then we won't have found the index yet. Need to measure from the end of the
189  // dataset.
190  if(not found_start or not found_end)
191  {
192  long long int last_gap_size = dset_length;
193  if(not first_chunk) last_gap_size -= (prev_chunk_end+1); // e.g. dataset ends at 9 (length 10); previous chunk finished at 6; last_gap_size = len(7,8,9) = 3 = 10 - (6+1)
194  if(last_gap_size<0)
195  {
196  std::ostringstream err;
197  err << "Rank "<<rank<<" chunk calculation encountered nonsense! Size of gap between last 'done_chunk' and the end of the dataset was computed as less than zero! ("<<last_gap_size<<" = dset_length("<<dset_length<<") - prev_chunk_end("<<prev_chunk_end<<")). This is a bug in the postprocessor, please report it." <<std::endl;
198  Scanner::scan_error().raise(LOCAL_INFO,err.str());
199  }
200  count += last_gap_size;
201  //std::cout << "Rank "<<rank<<": count = "<<count<<" (added LAST gap of size "<<last_gap_size<<"; dset_length="<<dset_length<<" - prev_chunk_end="<<prev_chunk_end<<")"<<std::endl;
202  if(not found_start)
203  {
204  std::size_t overshoot = count - eff_chunk.start; // ok so from above count=3, say. Suppose eff_chunk.start=0. overshoot=3
205  realchunk.start = dset_length - overshoot; // Then we want to start at index 7 = 10 - 3
206  //std::cout << "Rank "<<rank<<": "<<"final start overshoot = " << overshoot <<std::endl;
207  //std::cout << "Rank "<<rank<<": "<<"realchunk.start = " << realchunk.start <<std::endl;
208  found_start = true;
209  }
210  if(not found_end)
211  {
212  std::size_t overshoot = count - eff_chunk.end;
213  realchunk.end = dset_length - overshoot;
214  found_end = true;
215  //std::cout << "Rank "<<rank<<": "<<"final end overshoot = " << overshoot <<std::endl;
216  //std::cout << "Rank "<<rank<<": "<<"realchunk.end = " << realchunk.end <<std::endl;
217  }
218  }
219  // Basic sanity checks
220  if(realchunk.start >= dset_length)
221  {
222  std::ostringstream err;
223  err << "Rank "<<rank<<" chunk calculation returned nonsense! Assigned start of chunk ("<<realchunk.start<<") exceeds length of dataset ("<<dset_length<<") (end of chunk was "<<realchunk.end<<"). This is a bug in the postprocessor, please report it." <<std::endl;
224  Scanner::scan_error().raise(LOCAL_INFO,err.str());
225  }
226  if(realchunk.end >= dset_length)
227  {
228  std::ostringstream err;
229  err << "Rank "<<rank<<" chunk calculation returned nonsense! Assigned end of chunk ("<<realchunk.end<<") exceeds length of dataset ("<<dset_length<<") (start of chunk was "<<realchunk.start<<"). This is a bug in the postprocessor, please report it." <<std::endl;
230  Scanner::scan_error().raise(LOCAL_INFO,err.str());
231  }
232  // Final sanity checks
233  // Make sure the new chunk of assigned points doesn't start or end on a "done" point!
234  // Comment out for speed once debugging done
235  for(ChunkSet::const_iterator it=done_chunks.begin();
236  it!=done_chunks.end(); ++it)
237  {
238  if( it->end==realchunk.start
239  or it->end==realchunk.end
240  or it->start==realchunk.start
241  or it->start==realchunk.end)
242  {
243  std::ostringstream err;
244  err << "Rank "<<rank<<" chunk calculation returned nonsense! The assigned chunk start or end point is already listed as 'done'! This is a bug in the postprocessor, please report it. Debug output:" <<std::endl;
245  err << "Assigned chunk: ["<<realchunk.start << ", " <<realchunk.end<<"]"<<std::endl;
246  err << "Conflicting done_chunk: ["<<it->start << ", " <<it->end<<"]"<<std::endl;
247  Scanner::scan_error().raise(LOCAL_INFO,err.str());
248  }
249  }
250  return realchunk;
251  }
Struct to describe start and end indices for a chunk of data.
Definition: chunks.hpp:21
#define LOCAL_INFO
Definition: local_info.hpp:34
std::size_t eff_length
Definition: chunks.hpp:25
std::size_t end
Definition: chunks.hpp:24
Chunk get_effective_chunk(const std::size_t total_length, const unsigned int rank, const unsigned int numtasks)
Get &#39;effective&#39; start and end positions for a processing batch i.e.
std::size_t start
Definition: chunks.hpp:23
EXPORT_SYMBOLS error & scan_error()
Scanner errors.
std::size_t length() const
Definition: chunks.hpp:42
Here is the call graph for this function:
Here is the caller graph for this function:

◆ merge_chunks()

ChunkSet Gambit::PostProcessor::merge_chunks ( const ChunkSet input_chunks)

Simplify a ChunkSet by merging chunks which overlap.

Simplify a ChunkSet by merging chunks which overlap (or are directly adjacent).

Definition at line 316 of file postprocessor_object.cpp.

References Chunk::end, Gambit::LogTags::err, LOCAL_INFO, Gambit::Scanner::scan_error(), and Chunk::start.

Referenced by get_done_points().

317  {
318  ChunkSet merged_chunks;
319  if(input_chunks.size()>0)
320  {
321  Chunk new_chunk;
322  std::size_t prev_chunk_end = 0;
323  new_chunk.start = input_chunks.begin()->start; // Start of first chunk
324  for(ChunkSet::const_iterator it=input_chunks.begin();
325  it!=input_chunks.end(); ++it)
326  {
327  if(prev_chunk_end!=0 and it->start > prev_chunk_end)
328  {
329  // Gap detected; close the existing chunk and start a new one.
330  new_chunk.end = prev_chunk_end;
331  merged_chunks.insert(new_chunk);
332  new_chunk.start = it->start;
333  }
334 
335  if(it->end > prev_chunk_end)
336  {
337  prev_chunk_end = it->end;
338  }
339  }
340  // No more chunks, close the last open chunk
341  new_chunk.end = prev_chunk_end;
342  merged_chunks.insert(new_chunk);
343  // Sanity check; Starts and ends of merged chunks should match some start/end in the input chunks
344  for(ChunkSet::const_iterator it=merged_chunks.begin();
345  it!=merged_chunks.end(); ++it)
346  {
347  bool found_start = false;
348  bool found_end = false;
349  for(ChunkSet::const_iterator jt=input_chunks.begin();
350  jt!=input_chunks.end(); ++jt)
351  {
352  if(it->start==jt->start) found_start = true;
353  if(it->end==jt->end) found_end = true;
354  }
355  if(not found_start or not found_end)
356  {
357  std::ostringstream err;
358  err << "Error, merged 'done_chunks' are not consistent with the originally input done_chunks! This indicates a bug in the merge_chunks routine of the postprocessor, please report it. Debug output:" << endl;
359  err << "Problem merged chunk was ["<<it->start<<","<<it->end<<"]"<<endl;
360  Scanner::scan_error().raise(LOCAL_INFO,err.str());
361  }
362  // else fine, move to next merged chunk
363  }
364  }
365  // else there are no input chunks, just return an empty ChunkSet
366  return merged_chunks;
367  }
Struct to describe start and end indices for a chunk of data.
Definition: chunks.hpp:21
std::set< Chunk, ChunkLess > ChunkSet
Definition: chunks.hpp:69
#define LOCAL_INFO
Definition: local_info.hpp:34
std::size_t end
Definition: chunks.hpp:24
std::size_t start
Definition: chunks.hpp:23
EXPORT_SYMBOLS error & scan_error()
Scanner errors.
Here is the call graph for this function:
Here is the caller graph for this function:

◆ point_done()

bool Gambit::PostProcessor::point_done ( const ChunkSet  done_chunks,
size_t  index 
)

Helper functions for performing resume related tasks.

Answer queries as to whether a given dataset index has been postprocessed in a previous run or not

Definition at line 34 of file postprocessor_object.cpp.

35  {
36  bool answer = false;
37  for(ChunkSet::const_iterator it=done_chunks.begin();
38  it!=done_chunks.end(); ++it)
39  {
40  if(it->iContain(index))
41  {
42  answer = true;
43  break;
44  }
45  }
46  return answer;
47  }

◆ record_done_points()

void Gambit::PostProcessor::record_done_points ( const ChunkSet done_chunks,
const Chunk mydone,
const std::string &  filebase,
unsigned int  rank,
unsigned int  size 
)

Write resume data files These specify which chunks of points have been processed during this run.

Definition at line 371 of file postprocessor_object.cpp.

References Chunk::end, Gambit::LogTags::err, Gambit::Utils::file_exists(), combine_hdf5::fout, LOCAL_INFO, Gambit::Scanner::scan_error(), and Chunk::start.

Referenced by Gambit::PostProcessor::PPDriver::run_main_loop().

372  {
373  if(rank == 0)
374  {
375  // If we are rank 0, output any old chunks from previous resumes to a special file
376  // (deleting it first)
377  std::string outprev = filebase+"_prev.dat";
378  if( Gambit::Utils::file_exists(outprev) )
379  {
380  if( remove(outprev.c_str()) != 0 )
381  {
382  perror( ("Error deleting file "+outprev).c_str() );
383  std::ostringstream err;
384  err << "Unknown error removing old resume data file '"<<outprev<<"'!";
385  Scanner::scan_error().raise(LOCAL_INFO,err.str());
386  }
387  }
388  // else was deleted no problem
389  std::ofstream foutprev(outprev);
390  foutprev << size << std::endl;
391  for(ChunkSet::const_iterator it=done_chunks.begin();
392  it!=done_chunks.end(); ++it)
393  {
394  foutprev << it->start << " " << it->end << std::endl;
395  }
396  // check that the write succeeded
397  foutprev.close();
398  if (!foutprev)
399  {
400  std::ostringstream err;
401  err << "Unknown IO error while writing resume data file '"<<outprev<<"'!";
402  Scanner::scan_error().raise(LOCAL_INFO,err.str());
403  }
404  }
405  // Now output what we have done (could overlap with old chunks, but that doesn't really matter)
406  std::ostringstream outname;
407  outname << filebase << "_" << rank <<".dat";
408  std::string out = outname.str();
409  if( Gambit::Utils::file_exists(out) )
410  {
411  if( remove(out.c_str()) != 0 )
412  {
413  perror( ("Error deleting file "+out).c_str() );
414  std::ostringstream err;
415  err << "Unknown error removing old resume data file '"<<out<<"'!";
416  Scanner::scan_error().raise(LOCAL_INFO,err.str());
417  }
418  }
419  // else was deleted no problem, write new file
420  std::ofstream fout(out);
421  fout << mydone.start << " " << mydone.end << std::endl;
422  // let's just make sure the files had no errors while closing because they are important.
423  fout.close();
424  if (!fout)
425  {
426  std::ostringstream err;
427  err << "Unknown IO error while writing resume data file '"<<out<<"'!";
428  Scanner::scan_error().raise(LOCAL_INFO,err.str());
429  }
430  // Gah, data could apparantly still be buffered by the OS and not yet written to disk
431  // Apparantly on POSIX fsync can be used to ensure this happens, but I am not
432  // sure if the following works. This answer on StackOverflow seems to say it doesn't?
433  // http://stackoverflow.com/questions/676787/how-to-do-fsync-on-an-ofstream
434  //int fd = open(filename, O_APPEND);
435  //fsync(fd);
436  //close(fd);
437  // I may need to convert all these operations to old-school C operations
438  }
#define LOCAL_INFO
Definition: local_info.hpp:34
std::size_t end
Definition: chunks.hpp:24
EXPORT_SYMBOLS bool file_exists(const std::string &filename)
Check if a file exists.
std::size_t start
Definition: chunks.hpp:23
EXPORT_SYMBOLS error & scan_error()
Scanner errors.
Here is the call graph for this function:
Here is the caller graph for this function:

Variable Documentation

◆ stopchunk

const Chunk Gambit::PostProcessor::stopchunk = Chunk(0,0)

Definition at line 57 of file postprocessor.hpp.

Referenced by scanner_plugin().