gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-2191-ga4742ac
a Global And Modular Bsm Inference Tool
postprocessor_object.cpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
12 //
18 
23 
25 
26 namespace Gambit
27 {
28  namespace PostProcessor
29  {
30 
32 
34  bool point_done(const ChunkSet done_chunks, size_t index)
35  {
36  bool answer = false;
37  for(ChunkSet::const_iterator it=done_chunks.begin();
38  it!=done_chunks.end(); ++it)
39  {
40  if(it->iContain(index))
41  {
42  answer = true;
43  break;
44  }
45  }
46  return answer;
47  }
48 
52  Chunk get_effective_chunk(const std::size_t total_length, const unsigned int rank, const unsigned int numtasks)
53  {
54  // Compute which points this process is supposed to process. Divide total
55  // by number of MPI tasks.
56  unsigned long long my_length = total_length / numtasks;
57  unsigned long long r = total_length % numtasks;
58  // Offset from beginning for this task assuming equal lengths in each task
59  unsigned long long start = my_length * rank;
60  // Divide up the remainder amongst the tasks and adjust offsets to account for these
61  if(rank<r)
62  {
63  my_length++;
64  start+=rank;
65  }
66  else
67  {
68  start+=r;
69  }
70  unsigned long long end = start + my_length - 1; // Minus 1 for the zero indexing
71  return Chunk(start,end);
72  }
73 
75  Chunk get_my_chunk(const std::size_t dset_length, const ChunkSet& done_chunks, const int rank, const int numtasks)
76  {
78  std::size_t left_to_process = 0;
79  std::size_t prev_chunk_end = 0;
80  bool first_chunk = true;
81  for(ChunkSet::const_iterator it=done_chunks.begin();
82  it!=done_chunks.end(); ++it)
83  {
84  // total_done_length += it->length();
85  // Whoops, cannot just add lengths, because done_chunks can overlap. Need to add up the actual gaps
86  // between them
87  long long int gap_size = it->start; // e.g. done_chunk starts at say '5';
88  if(not first_chunk) gap_size -= (prev_chunk_end+1); // e.g. previous chunk finished at '1'; then gap_size is len(2,3,4) = 3 = 5 - 2. Unless no previous chunk, then gap_size is len(0,1,2,3,4) = 5.
89  // std::cout << "Rank "<<rank<<": "<<"examining done_chunk ["<<it->start<<","<<it->end<<"]"<<std::endl;
90  // std::cout << "Rank "<<rank<<": "<<"first? "<<first_chunk<<", prev_chunk_end = "<<prev_chunk_end<<std::endl;
91  // std::cout << "Rank "<<rank<<": "<<"gap_size = " << gap_size <<std::endl;
92  if(gap_size>0)
93  {
94  left_to_process += gap_size;
95  }
96  // Else the new done_chunk started before the previous done_chunk finished,
97  // (they are ordered only based on the start index)
98  // so we can skip it, or rather "merge" their lengths by just updating the
99  // prev_chunk_end location if it has increased.
100  if(first_chunk or it->end > prev_chunk_end)
101  {
102  first_chunk = false;
103  prev_chunk_end = it->end;
104  }
105  //std::cout << "Rank "<<rank<<": "<<"left_to_process = " << left_to_process <<std::endl;
106  }
107  // ...and add gap from last done_chunk to the end of the dataset
108  long long int last_gap_size = dset_length;
109  if(not first_chunk) last_gap_size -= (prev_chunk_end+1); // e.g. dataset ends at 9 (length 10); previous chunk finished at 6; last_gap_size = len(7,8,9) = 3 = 10 - (6+1)
110  //std::cout << "Rank "<<rank<<": "<<"dset_length = " << dset_length <<std::endl;
111  //std::cout << "Rank "<<rank<<": "<<"last_gap_size = " << last_gap_size <<std::endl;
112  if(last_gap_size>0)
113  {
114  left_to_process += last_gap_size;
115  }
116  //std::cout << "Rank "<<rank<<": "<<"left_to_process = " << left_to_process <<std::endl;
117  // Done! Sanity check.
118  if(left_to_process > dset_length)
119  {
120  std::ostringstream err;
121  err << "Rank "<<rank<<" chunk calculation encountered nonsense! Computed number of points left to process ("<<left_to_process<<") is greater than the actual dataset length ("<<dset_length<<")! This is a bug in the postprocessor, please report it." <<std::endl;
122  Scanner::scan_error().raise(LOCAL_INFO,err.str());
123  }
124  // if(rank==0) std::cout << "left_to_process = " << left_to_process;
125  // Get 'effective' start/end positions for this rank; i.e. what the start index would be if the 'done' points were removed.
126  Chunk eff_chunk = get_effective_chunk(left_to_process, rank, numtasks);
127 
128  // Convert effective chunk to real dataset indices (i.e. add in the 'skipped' indices)
129  std::size_t count = 0;
130  Chunk realchunk;
131  realchunk.eff_length = eff_chunk.length(); // Record real number of points that will be processed from this chunk
132  //std::cout << "Rank "<<rank<<": Converting to real dataset indices..." <<std::endl;
133  prev_chunk_end = 0; // Reset
134  first_chunk = true; // Reset
135  bool found_start = false;
136  bool found_end = false;
137  //std::cout << "Rank "<<rank<<": "<<"Computing real dataset indices..." <<std::endl;
138  for(ChunkSet::const_iterator it=done_chunks.begin();
139  it!=done_chunks.end(); ++it)
140  {
141  // Need to add up the size of the gaps between chunks until we exceed the "effective" start/end positions,
142  // then get the real indices by measuring back from the start of the done_chunk we are up to.
143  //std::cout << "Rank "<<rank<<": Getting next done_chunk ["<<it->start<<","<<it->end<<"]"<<std::endl;
144  long long int gap_size = it->start; // e.g. done_chunk starts at say '5';
145  if(not first_chunk) gap_size -= (prev_chunk_end+1); // e.g. previous chunk finished at '1'; then gap_size is len(2,3,4) = 3 = 5 - 2. Unless no previous chunk, then gap_size is len(0,1,2,3,4) = 5.
146  //std::cout << "Rank "<<rank<<": "<<"examining done_chunk ["<<it->start<<","<<it->end<<"]"<<std::endl;
147  //std::cout << "Rank "<<rank<<": "<<"first? "<<first_chunk<<", prev_chunk_end = "<<prev_chunk_end<<std::endl;
148  //std::cout << "Rank "<<rank<<": "<<"gap_size = " << gap_size <<std::endl;
149  if(gap_size>0)
150  {
151  count += gap_size;
152  //std::cout << "Rank "<<rank<<": count = "<<count<<" (added gap of size "<<gap_size<<"; done_chunk.start="<<it->start<<" - prev_chunk_end="<<prev_chunk_end<<")"<<std::endl;
153  //std::cout << "Rank "<<rank<<": "<<"count = " << count <<std::endl;
154  //std::cout << "Rank "<<rank<<": "<<"eff_chunk.start = " << eff_chunk.start <<std::endl;
155  if(not found_start and count > eff_chunk.start)
156  {
157  std::size_t overshoot = count - eff_chunk.start; // If count is 3 and our chunk is supposed to start at the first 'not done' point (index 0), we have overshot by 3 - 0 = 3 positions.
158  realchunk.start = it->start - overshoot; // So our start point is 5 - 3 = 2
159  //std::cout << "Rank "<<rank<<": "<<"start overshoot = " << overshoot <<std::endl;
160  //std::cout << "Rank "<<rank<<": "<<"realchunk.start = " << realchunk.start <<std::endl;
161  //std::cout << "Rank "<<rank<<": found start of chunk! realchunk.start = "<<realchunk.start<<", eff_chunk.start = "<<eff_chunk.start<<", overshoot = "<<overshoot<<std::endl;
162  found_start = true;
163  }
164  if(not found_end and count > eff_chunk.end)
165  {
166  std::size_t overshoot = count - eff_chunk.end; // Suppose our chunk should also end on the first 'not done' point (i.e. we have only one point assigned). Then we have the same calculation as above for the end.
167  realchunk.end = it->start - overshoot;
168  //std::cout << "Rank "<<rank<<": "<<"end overshoot = " << overshoot <<std::endl;
169  //std::cout << "Rank "<<rank<<": "<<"realchunk.end = " << realchunk.end <<std::endl;
170  found_end = true;
171  //std::cout << "Rank "<<rank<<": found end of chunk! realchunk.end = "<<realchunk.end<<", eff_chunk.end = "<<eff_chunk.end<<", overshoot = "<<overshoot<<std::endl;
172  break;
173  }
174  }
175  // Else the new done_chunk started before the previous done_chunk finished,
176  // (they are ordered only based on the start index)
177  // so we can skip it, or rather "merge" their lengths by just updating the
178  // prev_chunk_end location if it has increased.
179  if(first_chunk or it->end > prev_chunk_end)
180  {
181  first_chunk = false;
182  prev_chunk_end = it->end;
183  }
184 
185  //std::cout << "Rank "<<rank<<": set prev_chunk_end to "<<prev_chunk_end<<std::endl;
186  }
187  // If the chunk we need to process starts or finishes after the last done chunk,
188  // then we won't have found the index yet. Need to measure from the end of the
189  // dataset.
190  if(not found_start or not found_end)
191  {
192  long long int last_gap_size = dset_length;
193  if(not first_chunk) last_gap_size -= (prev_chunk_end+1); // e.g. dataset ends at 9 (length 10); previous chunk finished at 6; last_gap_size = len(7,8,9) = 3 = 10 - (6+1)
194  if(last_gap_size<0)
195  {
196  std::ostringstream err;
197  err << "Rank "<<rank<<" chunk calculation encountered nonsense! Size of gap between last 'done_chunk' and the end of the dataset was computed as less than zero! ("<<last_gap_size<<" = dset_length("<<dset_length<<") - prev_chunk_end("<<prev_chunk_end<<")). This is a bug in the postprocessor, please report it." <<std::endl;
198  Scanner::scan_error().raise(LOCAL_INFO,err.str());
199  }
200  count += last_gap_size;
201  //std::cout << "Rank "<<rank<<": count = "<<count<<" (added LAST gap of size "<<last_gap_size<<"; dset_length="<<dset_length<<" - prev_chunk_end="<<prev_chunk_end<<")"<<std::endl;
202  if(not found_start)
203  {
204  std::size_t overshoot = count - eff_chunk.start; // ok so from above count=3, say. Suppose eff_chunk.start=0. overshoot=3
205  realchunk.start = dset_length - overshoot; // Then we want to start at index 7 = 10 - 3
206  //std::cout << "Rank "<<rank<<": "<<"final start overshoot = " << overshoot <<std::endl;
207  //std::cout << "Rank "<<rank<<": "<<"realchunk.start = " << realchunk.start <<std::endl;
208  found_start = true;
209  }
210  if(not found_end)
211  {
212  std::size_t overshoot = count - eff_chunk.end;
213  realchunk.end = dset_length - overshoot;
214  found_end = true;
215  //std::cout << "Rank "<<rank<<": "<<"final end overshoot = " << overshoot <<std::endl;
216  //std::cout << "Rank "<<rank<<": "<<"realchunk.end = " << realchunk.end <<std::endl;
217  }
218  }
219  // Basic sanity checks
220  if(realchunk.start >= dset_length)
221  {
222  std::ostringstream err;
223  err << "Rank "<<rank<<" chunk calculation returned nonsense! Assigned start of chunk ("<<realchunk.start<<") exceeds length of dataset ("<<dset_length<<") (end of chunk was "<<realchunk.end<<"). This is a bug in the postprocessor, please report it." <<std::endl;
224  Scanner::scan_error().raise(LOCAL_INFO,err.str());
225  }
226  if(realchunk.end >= dset_length)
227  {
228  std::ostringstream err;
229  err << "Rank "<<rank<<" chunk calculation returned nonsense! Assigned end of chunk ("<<realchunk.end<<") exceeds length of dataset ("<<dset_length<<") (start of chunk was "<<realchunk.start<<"). This is a bug in the postprocessor, please report it." <<std::endl;
230  Scanner::scan_error().raise(LOCAL_INFO,err.str());
231  }
232  // Final sanity checks
233  // Make sure the new chunk of assigned points doesn't start or end on a "done" point!
234  // Comment out for speed once debugging done
235  for(ChunkSet::const_iterator it=done_chunks.begin();
236  it!=done_chunks.end(); ++it)
237  {
238  if( it->end==realchunk.start
239  or it->end==realchunk.end
240  or it->start==realchunk.start
241  or it->start==realchunk.end)
242  {
243  std::ostringstream err;
244  err << "Rank "<<rank<<" chunk calculation returned nonsense! The assigned chunk start or end point is already listed as 'done'! This is a bug in the postprocessor, please report it. Debug output:" <<std::endl;
245  err << "Assigned chunk: ["<<realchunk.start << ", " <<realchunk.end<<"]"<<std::endl;
246  err << "Conflicting done_chunk: ["<<it->start << ", " <<it->end<<"]"<<std::endl;
247  Scanner::scan_error().raise(LOCAL_INFO,err.str());
248  }
249  }
250  return realchunk;
251  }
252 
254  ChunkSet get_done_points(const std::string& filebase)
255  {
256  ChunkSet done_chunks;
257 
258  // First read collated chunk data from past resumes, and the number of processes used in the last run
259  std::string inprev = filebase+"_prev.dat";
260 
261  // Check if it exists (it will not exist on the first resume)
262  if(Utils::file_exists(inprev))
263  {
264  std::ifstream finprev(inprev);
265  if(finprev)
266  {
267  unsigned int prev_size;
268  finprev >> prev_size;
269  Chunk nextchunk;
270  while( finprev >> nextchunk.start >> nextchunk.end )
271  {
272  done_chunks.insert(nextchunk);
273  }
274 
275  // Now read each of the chunk files left by each process during previous run
276  for(unsigned int i=0; i<prev_size; ++i)
277  {
278  std::ostringstream inname;
279  inname << filebase << "_" << i << ".dat";
280  std::string in = inname.str();
281  if(Utils::file_exists(in))
282  {
283  std::ifstream fin(in);
284  if(fin)
285  {
286  fin >> nextchunk.start >> nextchunk.end;
287  done_chunks.insert(nextchunk);
288  }
289  else
290  {
291  std::ostringstream err;
292  err << "Tried to read postprocessor resume data from "<<in<<" but encountered a read error of some kind (the file seems to exist but appears to be unreadable";
293  Scanner::scan_error().raise(LOCAL_INFO,err.str());
294  }
295  }
296  else
297  {
298  std::ostringstream err;
299  err << "Tried to read postprocessor resume data from "<<in<<" but the file does not exist or is unreadable. We require this file because according to "<<inprev<<" there were "<<prev_size<<" processes in use during the last run, and we require the resume data from all of them";
300  Scanner::scan_error().raise(LOCAL_INFO,err.str());
301  }
302  }
303  }
304  else
305  {
306  std::ostringstream err;
307  err << "Tried to read postprocessor resume data from "<<inprev<<" but encountered a read error of some kind (the file seems to exist but appears to be unreadable";
308  Scanner::scan_error().raise(LOCAL_INFO,err.str());
309  }
310  }
311  // Else there is no resume data, assume that this is a new run started without the --restart flag.
312  return merge_chunks(done_chunks); // Simplify the chunks and return them
313  }
314 
316  ChunkSet merge_chunks(const ChunkSet& input_chunks)
317  {
318  ChunkSet merged_chunks;
319  if(input_chunks.size()>0)
320  {
321  Chunk new_chunk;
322  std::size_t prev_chunk_end = 0;
323  new_chunk.start = input_chunks.begin()->start; // Start of first chunk
324  for(ChunkSet::const_iterator it=input_chunks.begin();
325  it!=input_chunks.end(); ++it)
326  {
327  if(prev_chunk_end!=0 and it->start > prev_chunk_end)
328  {
329  // Gap detected; close the existing chunk and start a new one.
330  new_chunk.end = prev_chunk_end;
331  merged_chunks.insert(new_chunk);
332  new_chunk.start = it->start;
333  }
334 
335  if(it->end > prev_chunk_end)
336  {
337  prev_chunk_end = it->end;
338  }
339  }
340  // No more chunks, close the last open chunk
341  new_chunk.end = prev_chunk_end;
342  merged_chunks.insert(new_chunk);
343  // Sanity check; Starts and ends of merged chunks should match some start/end in the input chunks
344  for(ChunkSet::const_iterator it=merged_chunks.begin();
345  it!=merged_chunks.end(); ++it)
346  {
347  bool found_start = false;
348  bool found_end = false;
349  for(ChunkSet::const_iterator jt=input_chunks.begin();
350  jt!=input_chunks.end(); ++jt)
351  {
352  if(it->start==jt->start) found_start = true;
353  if(it->end==jt->end) found_end = true;
354  }
355  if(not found_start or not found_end)
356  {
357  std::ostringstream err;
358  err << "Error, merged 'done_chunks' are not consistent with the originally input done_chunks! This indicates a bug in the merge_chunks routine of the postprocessor, please report it. Debug output:" << endl;
359  err << "Problem merged chunk was ["<<it->start<<","<<it->end<<"]"<<endl;
360  Scanner::scan_error().raise(LOCAL_INFO,err.str());
361  }
362  // else fine, move to next merged chunk
363  }
364  }
365  // else there are no input chunks, just return an empty ChunkSet
366  return merged_chunks;
367  }
368 
371  void record_done_points(const ChunkSet& done_chunks, const Chunk& mydone, const std::string& filebase, unsigned int rank, unsigned int size)
372  {
373  if(rank == 0)
374  {
375  // If we are rank 0, output any old chunks from previous resumes to a special file
376  // (deleting it first)
377  std::string outprev = filebase+"_prev.dat";
378  if( Gambit::Utils::file_exists(outprev) )
379  {
380  if( remove(outprev.c_str()) != 0 )
381  {
382  perror( ("Error deleting file "+outprev).c_str() );
383  std::ostringstream err;
384  err << "Unknown error removing old resume data file '"<<outprev<<"'!";
385  Scanner::scan_error().raise(LOCAL_INFO,err.str());
386  }
387  }
388  // else was deleted no problem
389  std::ofstream foutprev(outprev);
390  foutprev << size << std::endl;
391  for(ChunkSet::const_iterator it=done_chunks.begin();
392  it!=done_chunks.end(); ++it)
393  {
394  foutprev << it->start << " " << it->end << std::endl;
395  }
396  // check that the write succeeded
397  foutprev.close();
398  if (!foutprev)
399  {
400  std::ostringstream err;
401  err << "Unknown IO error while writing resume data file '"<<outprev<<"'!";
402  Scanner::scan_error().raise(LOCAL_INFO,err.str());
403  }
404  }
405  // Now output what we have done (could overlap with old chunks, but that doesn't really matter)
406  std::ostringstream outname;
407  outname << filebase << "_" << rank <<".dat";
408  std::string out = outname.str();
409  if( Gambit::Utils::file_exists(out) )
410  {
411  if( remove(out.c_str()) != 0 )
412  {
413  perror( ("Error deleting file "+out).c_str() );
414  std::ostringstream err;
415  err << "Unknown error removing old resume data file '"<<out<<"'!";
416  Scanner::scan_error().raise(LOCAL_INFO,err.str());
417  }
418  }
419  // else was deleted no problem, write new file
420  std::ofstream fout(out);
421  fout << mydone.start << " " << mydone.end << std::endl;
422  // let's just make sure the files had no errors while closing because they are important.
423  fout.close();
424  if (!fout)
425  {
426  std::ostringstream err;
427  err << "Unknown IO error while writing resume data file '"<<out<<"'!";
428  Scanner::scan_error().raise(LOCAL_INFO,err.str());
429  }
430  // Gah, data could apparantly still be buffered by the OS and not yet written to disk
431  // Apparantly on POSIX fsync can be used to ensure this happens, but I am not
432  // sure if the following works. This answer on StackOverflow seems to say it doesn't?
433  // http://stackoverflow.com/questions/676787/how-to-do-fsync-on-an-ofstream
434  //int fd = open(filename, O_APPEND);
435  //fsync(fd);
436  //close(fd);
437  // I may need to convert all these operations to old-school C operations
438  }
439 
440  // Gather a bunch of ints from all processes (COLLECTIVE OPERATION)
441  #ifdef WITH_MPI
442  std::vector<int> allgather_int(int myval, GMPI::Comm& comm)
443  {
444  const MPI_Datatype datatype = GMPI::get_mpi_data_type<int>::type(); // datatype for ints
445  int sendbuf = myval;
446  std::vector<int> all_vals(comm.Get_size(),0);
447  MPI_Allgather(
448  &sendbuf, /* send buffer */
449  1, /* send count */
450  datatype, /* send datatype */
451  &all_vals[0], /* recv buffer */
452  1, /* recv count */
453  datatype, /* recv datatype */
454  *(comm.get_boundcomm()) /* communicator */
455  );
456  return all_vals;
457  }
458  #endif
459 
462 
465  : reader(NULL)
466  , printer(NULL)
467  , LogLike()
468  , new_params()
469  , req_models()
470  , longname()
471  , all_params()
472  , data_labels()
473  , data_labels_copy()
474  , add_to_logl()
475  , subtract_from_logl()
476  , renaming_scheme()
477  , cut_less_than()
478  , cut_greater_than()
479  , discard_points_outside_cuts()
480  , update_interval()
481  , discard_old_logl()
482  , logl_purpose_name()
483  , reweighted_loglike_name()
484  , root()
485  , numtasks()
486  , rank()
487  #ifdef WITH_MPI
488  , comm(NULL)
489  #endif
490  {}
491 
495  , Printers::BaseBasePrinter* const p
496  , Scanner::like_ptr const l
497  , const PPOptions& o
498  )
499  : reader(r)
500  , printer(p)
501  , LogLike(l)
502  , new_params()
503  , req_models()
504  , longname()
505  , all_params (o.all_params )
506  , data_labels (o.data_labels )
508  , add_to_logl (o.add_to_logl )
518  , root (o.root )
519  , numtasks (o.numtasks )
520  , rank (o.rank )
521  #ifdef WITH_MPI
522  , comm (o.comm )
523  #endif
524  {
525  // Retrieve parameter and model names
526  std::vector<std::string> keys = getLogLike()->getPrior().getParameters(); // use to use get_keys() in the objective (prior) plugin;
527 
528  // Pull the keys apart into model-name, parameter-name pairs
529  if(rank==0) std::cout << "Number of model parameters to be retrieved from previous output: "<< keys.size() <<std::endl;
530  for(auto it=keys.begin(); it!=keys.end(); ++it)
531  {
532  if(rank==0) std::cout << " " << *it << std::endl;
533  std::vector<std::string> splitkey = Utils::delimiterSplit(*it, "::");
534  std::string model = splitkey[0];
535  std::string par = splitkey[1];
536  req_models[model].push_back(par);
537  longname[model][par] = *it;
538  }
539  #ifdef WITH_MPI
540  if(comm==NULL)
541  {
542  std::ostringstream err;
543  err << "No MPI communicator supplied to postprocessor driver object! This is a bug in the postprocessor scanner plugin, please report it.";
544  Scanner::scan_error().raise(LOCAL_INFO,err.str());
545  }
546  #endif
547  }
548 
551  {
552  if(reader==NULL)
553  {
554  std::ostringstream err;
555  err << "Postprocessor tried to access reader object, but found only a NULL pointer! The postprocessor has therefore not been set up correctly, please report this bug.";
556  Scanner::scan_error().raise(LOCAL_INFO,err.str());
557  }
558  return *reader;
559  }
560 
562  {
563  if(printer==NULL)
564  {
565  std::ostringstream err;
566  err << "Postprocessor tried to access printer object, but found only a NULL pointer! The postprocessor has therefore not been set up correctly, please report this bug.";
567  Scanner::scan_error().raise(LOCAL_INFO,err.str());
568  }
569  return *printer;
570  }
571 
573  {
574  // Actually it is a strange Greg-pointer, can't set it to NULL it seems.
575  // if(LogLike==NULL)
576  // {
577  // std::ostringstream err;
578  // err << "Postprocessor tried to access LogLike object, but found only a NULL pointer! The postprocessor has therefore not been set up correctly, please report this bug.";
579  // Scanner::scan_error().raise(LOCAL_INFO,err.str());
580  // }
581  return LogLike;
582  }
584 
586  {
587  bool request_seen = false;
588  #ifdef WITH_MPI
589  request_seen = comm->Iprobe(MPI_ANY_SOURCE, REDIST_REQ);
590  #endif
591  return request_seen;
592  }
593 
595  {
596  // Inform all processes of the resynchronisation request
597  #ifdef WITH_MPI
598  int nullbuf = 0;
599  comm->IsendToAll(&nullbuf, 0, REDIST_REQ);
600  #endif
601  }
602 
604  {
605  // Ensure all redistribution request messages are recv'd
606  #ifdef WITH_MPI
608  {
609  int nullbuf;
610  comm->Recv(&nullbuf, 1, MPI_ANY_SOURCE, REDIST_REQ);
611  }
612  #endif
613  }
614 
617  {
618  new_params = all_params; // Parameters not present in the input file; to be deduced here. Start from everything and cut out those in the input file.
619 
620  if(not discard_old_logl)
621  {
622  if(std::find(data_labels.begin(), data_labels.end(), logl_purpose_name)
623  != data_labels.end())
624  {
625  std::ostringstream err;
626  err << "Error starting postprocessing run! The 'purpose' name selected for the likelihood to be computed ('"<<logl_purpose_name<<"') collides with an entry in the chosen input data. Please either change the name given in the scanner option 'like', or set 'permit_discard_old_likes' to 'true' to allow the old data to be replaced in the new output.";
627  Scanner::scan_error().raise(LOCAL_INFO,err.str());
628  }
629  if(std::find(data_labels.begin(), data_labels.end(), reweighted_loglike_name)
630  != data_labels.end())
631  {
632  std::ostringstream err;
633  err << "Error starting postprocessing run! The label name selected for the result of likelihood weighting ('"<<reweighted_loglike_name<<"') collides with an entry in the chosen input data. Please either change the name given in the scanner option 'reweighted_like', or set 'permit_discard_old_likes' to 'true' to allow the old data to be replaced in the new output.";
634  Scanner::scan_error().raise(LOCAL_INFO,err.str());
635  }
636  }
637 
641  for(std::map<std::string,std::string>::iterator it = renaming_scheme.begin(); it!=renaming_scheme.end(); ++it)
642  {
643  std::string in_label = it->first;
644  std::string out_label = it->second;
645 
646  // Make sure input label actually exists
647  if(std::find(data_labels.begin(), data_labels.end(), in_label)
648  == data_labels.end())
649  {
650  //Whoops, could not find this label in the input data
651  std::ostringstream err;
652  err << "Could not find data labelled '"<<in_label<<"' in the input dataset for postprocessing! In your master YAML file you have requested this data to be relabelled to '"<<out_label<<"', however it could not be found under the specified input label.";
653  Scanner::scan_error().raise(LOCAL_INFO,err.str());
654  }
655 
656  // Make sure chosen output name is not already claimed by the printer
657  if(std::find(all_params.begin(), all_params.end(), out_label)
658  != all_params.end())
659  {
660  //Whoops, name already in use by something else!
661  std::ostringstream err;
662  err << "Cannot rename dataset '"<<in_label<<"' to '"<<out_label<<"'! The requested output label has already been claimed by some other component in the scan. Please choose a different output label for this dataset in the master YAML file, or remove it from the 'rename' options for the postprocessor scanner plugin";
663  Scanner::scan_error().raise(LOCAL_INFO,err.str());
664  }
665 
666  // Make sure chosen output name doesn't clash with an un-renamed item to be copied
667  std::set<std::string>::iterator jt = std::find(data_labels.begin(), data_labels.end(), out_label);
668  if(jt != data_labels.end())
669  {
670  // Potential clash; check if the name is going to be changed
671  std::map<std::string,std::string>::iterator kt = renaming_scheme.find(*jt);
672  if(kt == renaming_scheme.end())
673  {
674  // Not getting renamed! Error
675  std::ostringstream err;
676  err << "Cannot rename dataset '"<<in_label<<"' to '"<<out_label<<"'! The requested output label clashes with an item in the input dataset (which isn't getting renamed). Please choose a different output label for this dataset in the master YAML file, remove it from the 'rename' options for the postprocessor scanner plugin, or request for the conflicting input label to be renamed.";
677  Scanner::scan_error().raise(LOCAL_INFO,err.str());
678  }
679  // Could still be a problem if the renamed name clashes, but we will check for that separately
680  }
681 
682  // Make sure chosen output name doesn't clash with another renamed name
683  for(std::map<std::string,std::string>::iterator jt = renaming_scheme.begin();
684  jt!=renaming_scheme.end(); ++jt)
685  {
686  if((jt->second==it->second) and (jt->first!=it->first))
687  {
688  // If the out_labels match (and we aren't just clashing with ourselves)
689  // Then this is forbidden
690  std::ostringstream err;
691  err << "Cannot rename dataset '"<<in_label<<"' to '"<<out_label<<"'! The requested output label has already been claimed by some another item in the renaming scheme (you requested '"<<jt->first<<"' to also be renamed to '"<<jt->second<<"'). Please choose a different output label for one of these datasets in the master YAML file, or remove one of them from the 'rename' options for the postprocessor scanner plugin";
692  Scanner::scan_error().raise(LOCAL_INFO,err.str());
693  }
694  }
695 
696  // Make sure the user isn't trying to rename a protected name (MPIrank, pointID)
697  if(in_label=="MPIrank" or in_label=="pointID")
698  {
699  std::ostringstream err;
700  err << "Cannot rename dataset '"<<in_label<<"' to '"<<out_label<<"'! The input dataset has a special purpose so renaming it is forbidden. Please remove it from the 'rename' options for the postprocessor scanner plugin in your master YAML file.";
701  Scanner::scan_error().raise(LOCAL_INFO,err.str());
702  }
703  }
704 
705  // Check that the cut maps refer to input datasets that actually exist
706  for(std::map<std::string,double>::iterator it = cut_less_than.begin(); it!=cut_less_than.end(); ++it)
707  {
708  std::string in_label = it->first;
709  double cut_value = it->second;
710 
711  // Make sure input label actually exists
712  if(std::find(data_labels.begin(), data_labels.end(), in_label)
713  == data_labels.end())
714  {
715  //Whoops, could not find this label in the input data
716  std::ostringstream err;
717  err << "Could not find data labelled '"<<in_label<<"' in the input dataset for postprocessing! In your master YAML file you have requested to only postprocess points satisfying the criteria '"<<in_label<<"' <= "<<cut_value<<", however the requested dataset for cutting could not be found under the specified input label. Please fix the label or remove this entry from the 'cut_less_than' list.";
718  Scanner::scan_error().raise(LOCAL_INFO,err.str());
719  }
720 
721  // Make sure it has type 'double'
722  if(getReader().get_type(in_label) != Printers::getTypeID<double>())
723  {
724  std::ostringstream err;
725  err << "Type of input dataset '"<<in_label<<"' is not 'double'! In your master YAML file you have requested to only postprocess points satisfying the criteria '"<<in_label<<"' <= "<<cut_value<<", however the requested dataset for cutting cannot be retrieved as type 'double'. Currently cuts can only be applied to datasets stored as doubles, sorry! Please remove this entry from the 'cut_less_than' list.";
726  // DEBUG
727  err << std::endl << "input type ID:" << getReader().get_type(in_label) << std::endl;
728  err << "double type ID:" << Printers::getTypeID<double>() << std::endl;
729  Scanner::scan_error().raise(LOCAL_INFO,err.str());
730  }
731  }
732  for(std::map<std::string,double>::iterator it = cut_greater_than.begin(); it!=cut_greater_than.end(); ++it)
733  {
734  std::string in_label = it->first;
735  double cut_value = it->second;
736 
737  // Make sure input label actually exists
738  if(std::find(data_labels.begin(), data_labels.end(), in_label)
739  == data_labels.end())
740  {
741  //Whoops, could not find this label in the input data
742  std::ostringstream err;
743  err << "Could not find data labelled '"<<in_label<<"' in the input dataset for postprocessing! In your master YAML file you have requested to only postprocess points satisfying the criteria '"<<in_label<<"' >= "<<cut_value<<", however the requested dataset for cutting could not be found under the specified input label. Please fix the label or remove this entry from the 'cut_greater_than' list.";
744  Scanner::scan_error().raise(LOCAL_INFO,err.str());
745  }
746 
747  // Make sure it has type 'double'
748  if(getReader().get_type(in_label) != Printers::getTypeID<double>())
749  {
750  std::ostringstream err;
751  err << "Type of input dataset '"<<in_label<<"' is not 'double'! In your master YAML file you have requested to only postprocess points satisfying the criteria '"<<in_label<<"' <= "<<cut_value<<", however the requested dataset for cutting cannot be retrieved as type 'double'. Currently cuts can only be applied to datasets stored as doubles, sorry! Please remove this entry from the 'cut_greater_than' list.";
752  Scanner::scan_error().raise(LOCAL_INFO,err.str());
753  }
754  }
755 
756 
757  // Check what data is to be copied and what is to be recomputed
758  if(rank==0) std::cout << "Determining which data is to be copied from input file to new output file, and which will be recomputed..." <<std::endl;
759  if(rank==0) std::cout << " Datasets found in input file: " << std::endl;
760  for(auto it = data_labels.begin(); it!=data_labels.end(); ++it)
761  {
762  // Check if any parameters we plan to copy have already been registered by the
763  // printer system.
764  // This is actually a little tricky, since names of parameters can be modified
765  // in the output depending on what printer was used. So far we have kept a certain
766  // consistency that can be exploited, but it isn't enforced. Should note this somewhere
767  // in the printer documentation.
768  // For example, when printing ModelParameters, they have their actual parameter names
769  // appended and they are output as separate datasets/columns. Likewise for vector
770  // components. But this appending rule is so far consistent, so I think we can just
771  // check that no prefix substring of the proposed copy has already been registered.
772  // Not sure if this has a danger of observable names just by accident being prefixes of
773  // some other name?
774  bool is_new = true;
775  for(auto jt = all_params.begin(); jt!=all_params.end(); ++jt)
776  {
777  if( ( (*it)==(*jt) )
778  or Gambit::Utils::startsWith(*it,(*jt)+":")
779  or Gambit::Utils::startsWith(*it,(*jt)+"[")
780  or Gambit::Utils::startsWith(*it,(*jt)+"{")
781  or Gambit::Utils::startsWith(*it,(*jt)+"%")
782  or Gambit::Utils::startsWith(*it,(*jt)+"#")
783  ) // if not [input data label] starts with [existing parameter] (plus append seperator character, for extra info like parameter name or index)
784  {
785  // Then it is not new. Not allowed to copy this, the likelihood container is already printing it anew.
786  new_params.erase(*jt);
787  is_new = false;
788  break;
789  }
790  }
791 
792  if(is_new)
793  {
794  data_labels_copy.insert(*it); // Not otherwise printed; schedule for copying
795  if(rank==0) std::cout << " copy : "<< (*it) <<std::endl;
796  // Check if it is getting relabelled
797  std::map<std::string,std::string>::iterator jt = renaming_scheme.find(*it);
798  if(jt != renaming_scheme.end())
799  {
800  // Yep, getting relabelled
801  if(rank==0) std::cout << " to --> : "<< jt->second <<std::endl;
802  }
803  }
804  else
805  {
806  if(rank==0) std::cout << " recompute: "<< (*it) <<std::endl;
807  // Check if it is getting relabelled
808  std::map<std::string,std::string>::iterator jt = renaming_scheme.find(*it);
809  if(jt != renaming_scheme.end())
810  {
811  // Yep, getting relabelled
812  data_labels_copy.insert(*it); // Allowed to copy this after all since the name will be changed
813  if(rank==0)
814  {
815  std::cout << " with old data copied"<<std::endl;
816  std::cout << " to --> : "<< jt->second <<std::endl;
817  }
818  }
819  }
820  // Check if a cut is being applied on this input dataset
821  if(rank==0)
822  {
823  std::map<std::string,double>::iterator jt = cut_less_than.find(*it);
824  if(jt != cut_less_than.end())
825  {
826  std::cout << " with cut <= "<< jt->second <<std::endl;
827  }
828  std::map<std::string,double>::iterator kt = cut_greater_than.find(*it);
829  if(kt != cut_greater_than.end())
830  {
831  std::cout << " with cut >= "<< kt->second <<std::endl;
832  }
833  }
834  }
835  // Might as well also list what new stuff is listed for creation
836  if(rank==0)
837  {
838  std::cout << " New datasets to be added: " << std::endl;
839  for(auto it = new_params.begin(); it!=new_params.end(); ++it)
840  {
841  std::cout << " " << *it << std::endl;
842  }
843  }
844  if(rank==0) std::cout << "Copy analysis complete." <<std::endl;
846 
847 
849  if(not discard_old_logl)
850  {
851  // Check if any of the likelihood components being added or subtracted from the likelihood
852  // are going to be replaced in the new output. User must set 'permit_discard_old_likes" to explictly allow this.
853  for(auto it=add_to_logl.begin(); it!=add_to_logl.end(); ++it)
854  {
855  if(std::find(all_params.begin(), all_params.end(), *it)
856  != all_params.end())
857  {
858  std::ostringstream err;
859  err << "Error starting postprocessing run! One of the data entries listed in the option 'add_to_like' is scheduled to be recalculated during postprocessing ("<<*it<<"). This is permitted; the old value will be added to 'like' and then discarded and replaced by the new value, however you must explicitly permit this to occur by setting 'permit_discard_old_likes' to 'true'.";
860  Scanner::scan_error().raise(LOCAL_INFO,err.str());
861  }
862  }
863 
864  for(auto it=subtract_from_logl.begin(); it!=subtract_from_logl.end(); ++it)
865  {
866  if(std::find(all_params.begin(), all_params.end(), *it)
867  != all_params.end())
868  {
869  std::ostringstream err;
870  err << "Error starting postprocessing run! One of the data entries listed in the option 'subtract_from_like' is scheduled to be recalculated during postprocessing ("<<*it<<"). This is permitted; the old value will be subtracted from 'like' and then discarded and replaced by the new value, however you must explicitly permit this to occur by setting 'permit_discard_old_likes' to 'true'.";
871  Scanner::scan_error().raise(LOCAL_INFO,err.str());
872  }
873 
874  }
875  }
876 
877  }
878 
881  {
882  // Compute which points this process is supposed to process. Divide up
883  // by number of MPI tasks.
884  if(rank==0) std::cout<<"Computing work assignments (may take a little time for very large datasets)"<<std::endl;
885  unsigned long long total_length = getReader().get_dataset_length();
886  Chunk mychunk = get_my_chunk(total_length, done_chunks, rank, numtasks);
887 
888  // Loop over the old points
889  getReader().reset(); // Make sure we start from the beginning of the file upon redistributing work
890  PPIDpair current_point = getReader().get_next_point(); // Get first point
891  std::size_t loopi = 0; // track true index of input file
892  std::size_t ppi = 0; // track number of points actually processed
893  if(rank==0) std::cout << "Starting loop over old points ("<<total_length<<" in total)" << std::endl;
894  std::cout << "This task (rank "<<rank<<" of "<<numtasks<<"), will process iterations "<<mychunk.start<<" through to "<<mychunk.end<<", excluding any points that may have already been processed as recorded by resume data. This leaves "<<mychunk.eff_length<<" points for this rank to process."<<std::endl;
895 
896  std::size_t n_passed = 0; // Number which have passed any user-specified cuts.
897 
898  // Disable auto-incrementing of pointID's in the likelihood container. We will set these manually.
900 
901  bool quit = false; // Flag to abort 'scan' early.
902  bool redistribution_requested = false; // Flag to temporarily stop to redistribute remaining workload amongst MPI processes
903  bool redistribution_request_ignored = false;
904  bool stop_loop = false;
905 
906  if(getReader().eoi())
907  {
908  std::cout << "Postprocessor (rank "<<rank<<") immediately reached end of input file! Skipping execution of main loop, ..."<<std::endl;
909  // We should exit with the "unexpected finish" error code if this has happened.
910  }
911 
912  ChunkSet::iterator current_done_chunk=done_chunks.begin(); // Used to skip past points that are already done
913  while(not getReader().eoi() and not stop_loop) // while not end of input
914  {
915  loopi++;
916  // Make sure we didn't somehow get desynchronised from the reader's internal index
917  if(loopi!=getReader().get_current_index())
918  {
919  std::ostringstream err;
920  err << "The postprocessor has become desynchronised with its assigned reader object! The postprocessor index is "<<loopi<<" but the reader index is "<<getReader().get_current_index()<<"! This indicates a bug in either the postprocessor or the reader, please report it";
921  Scanner::scan_error().raise(LOCAL_INFO,err.str());
922  }
923 
924  // Cancel processing of iterations beyoud our assigned range
925  if(loopi>mychunk.end)
926  {
927  std::cout << "Rank "<<rank<<" has reached the end of its batch, stopping iteration." << std::endl;
928  loopi--; // Return counter to the last index that we actually processed.
929  break;
930  }
931 
932  // If we have moved past the end of the currently selected batch of "done"
933  // points, then select the next batch (if there are any left)
934  //if(current_done_chunk!=done_chunks.end()) std::cout << "Rank "<<rank<<": loopi="<<loopi<<", current_done_chunk=["<<current_done_chunk->start<<","<<current_done_chunk->end<<"]"<<std::endl;
935  if(current_done_chunk!=done_chunks.end() and loopi > current_done_chunk->end)
936  {
937  ++current_done_chunk;
938  }
939 
940  // Skip loop ahead to the batch of points we are assigned to process,
941  // and skip any points that are already processed;
942  if(loopi<mychunk.start or (current_done_chunk!=done_chunks.end() and current_done_chunk->iContain(loopi)))
943  {
944  current_point = getReader().get_next_point();
945  continue;
946  }
947 
948  if((ppi % update_interval) == 0 and ppi!=0)
949  {
950  // Progress report
951  std::cout << "Rank "<<rank<<" has processed "<<ppi<<" of "<<mychunk.eff_length<<" points ("<<100*ppi/mychunk.eff_length<<"%, with "<<100*n_passed/ppi<<"% passing all cuts)"<<std::endl;
952  }
953  ppi++; // Processing is go, update counter.
954 
955  // Data about current point in input file
956  if(current_point == Printers::nullpoint)
957  {
958  // No valid data here, go to next point
959  current_point = getReader().get_next_point();
960  continue;
961  }
962  unsigned int MPIrank = current_point.rank;
963  unsigned long long pointID = current_point.pointID;
964  //std::cout << "Rank: "<<rank<<", current iteration: "<<loopi<<", current point:" << MPIrank << ", " << pointID << std::endl;
965 
967 
968  // Storage for retrieved parameters
969  std::unordered_map<std::string, double> outputMap;
970 
971  // Extract the model parameters
972  bool valid_modelparams = get_ModelParameters(outputMap);
973 
974  // Check if valid model parameters were extracted. If not, something may be wrong with the input file, or we could just be at the end of a buffer (e.g. in HDF5 case). Can't tell the difference, so just skip the point and continue.
975  if(not valid_modelparams)
976  {
977  std::cout << "Skipping point "<<loopi<<" as it has no valid ModelParameters" <<std::endl;
978  current_point = getReader().get_next_point();
979  continue;
980  }
981 
983 
984  // Determine if model point passes the user-requested cuts
985  // This is a little tricky because we don't know the type of the input dataset.
986  // For now we will restrict the system so that it only works for datasets with
987  // type 'double' (which is most stuff). We check for this earlier, so here we
988  // can just assume that the requested datasets have the correct type.
989 
990  bool cuts_passed = true; // Will be set to false if any cut is failed, or a required entry is invalid
991  for(std::map<std::string,double>::iterator it = cut_less_than.begin();
992  it!=cut_less_than.end(); ++it)
993  {
994  if(cuts_passed)
995  {
996  std::string in_label = it->first;
997  double cut_value = it->second;
998  double buffer;
999  bool valid = getReader().retrieve(buffer, in_label);
1000  if(valid)
1001  {
1002  cuts_passed = (buffer <= cut_value);
1003  }
1004  else
1005  {
1006  cuts_passed = false;
1007  }
1008  }
1009  }
1010 
1011  for(std::map<std::string,double>::iterator it = cut_greater_than.begin();
1012  it!=cut_greater_than.end(); ++it)
1013  {
1014  if(cuts_passed)
1015  {
1016  std::string in_label = it->first;
1017  double cut_value = it->second;
1018  double buffer;
1019  bool valid = getReader().retrieve(buffer, in_label);
1020  if(valid)
1021  {
1022  cuts_passed = (buffer >= cut_value);
1023  }
1024  else
1025  {
1026  cuts_passed = false;
1027  }
1028  }
1029  }
1030 
1031  if(cuts_passed) // Else skip new calculations and go straight to copying old results
1032  {
1033  n_passed += 1; // Counter for number of points which have passed all the cuts.
1034  // Before calling the likelihood function, we need to set up the printer to
1035  // output correctly. The auto-incrementing of pointID's cannot be used,
1036  // because we need to match the old scan results. So we must set it manually.
1037  // This is currently a little clunky but it works. Make sure to have turned
1038  // off auto incrementing (see above).
1039  // The printer should still print to files split according to the actual rank, this
1040  // should only change the assigned pointID pair tag. Which should already be
1041  // properly unambiguous if the original scan was done properly.
1042  // Note: This might fail for merged datasets from separate runs. Not sure what the solution
1043  // for that is.
1044  getLogLike()->setRank(MPIrank); // For purposes of printing only
1045  getLogLike()->setPtID(pointID);
1046 
1047 
1048  // We feed the unit hypercube and/or transformed parameter map into the likelihood container. ScannerBit
1049  // interprets the map values as post-transformation and not apply a prior to those, and ensures that the
1050  // length of the cube plus number of transformed parameters adds up to the total number of parameter.
1051  double new_logL = getLogLike()(outputMap); // Here we supply *only* the map; no parameters to transform.
1052 
1053  // Add old likelihood components as requested in the inifile
1054  if (not add_to_logl.empty() or not subtract_from_logl.empty())
1055  {
1056 
1057  double combined_logL = new_logL;
1058  bool is_valid(true);
1059 
1060  for(auto it=add_to_logl.begin(); it!=add_to_logl.end(); ++it)
1061  {
1062  std::string old_logl = *it;
1063  if(std::find(data_labels.begin(), data_labels.end(), old_logl)
1064  == data_labels.end())
1065  {
1066  std::ostringstream err;
1067  err << "In the input YAML file, you requested to 'add_to_like' the component '"<<old_logl<<"' from your input data file, however this does not match any of the data labels retrieved from the input data file you specified. Please check the spelling, path, etc. and try again.";
1068  Scanner::scan_error().raise(LOCAL_INFO,err.str());
1069  }
1070  if(getReader().get_type(*it) != Gambit::Printers::getTypeID<double>())
1071  {
1072  std::ostringstream err;
1073  err << "In the input YAML file, you requested 'add_to_like' component '"<<old_logl<<"' from your input data file, however this data cannot be retrieved as type 'double', therefore it cannot be used as a likelihood component. Please enter a different data label and try again.";
1074  Scanner::scan_error().raise(LOCAL_INFO,err.str());
1075  }
1076 
1077  double old_logl_value;
1078  is_valid = is_valid and getReader().retrieve(old_logl_value, old_logl);
1079  if(is_valid)
1080  {
1081  // Combine with the new logL component
1082  combined_logL += old_logl_value;
1083  }
1084  // Else old likelihood value didn't exist for this point; cannot combine with non-existent likelihood, so don't print the reweighted value.
1085  }
1086 
1087  // Now do the same thing for the components we want to subtract.
1088  for(auto it=subtract_from_logl.begin(); it!=subtract_from_logl.end(); ++it)
1089  {
1090  std::string old_logl = *it;
1091  if(std::find(data_labels.begin(), data_labels.end(), old_logl)
1092  == data_labels.end())
1093  {
1094  std::ostringstream err;
1095  err << "In the input YAML file, you requested to 'subtract_from_like' the component '"<<old_logl<<"' from your input data file, however this does not match any of the data labels retrieved from the input data file you specified. Please check the spelling, path, etc. and try again.";
1096  Scanner::scan_error().raise(LOCAL_INFO,err.str());
1097  }
1098  if(getReader().get_type(*it) != Gambit::Printers::getTypeID<double>())
1099  {
1100  std::ostringstream err;
1101  err << "In the input YAML file, you requested 'subtract_from_like' component '"<<old_logl<<"' from your input data file, however this data cannot be retrieved as type 'double', therefore it cannot be used as a likelihood component. Please enter a different data label and try again.";
1102  Scanner::scan_error().raise(LOCAL_INFO,err.str());
1103  }
1104 
1105  double old_logl_value;
1106  is_valid = is_valid and getReader().retrieve(old_logl_value, old_logl);
1107  if(is_valid)
1108  {
1109  // Combine with the new logL component, subtracting this time
1110  combined_logL -= old_logl_value;
1111  }
1112  // Else old likelihood value didn't exist for this point; cannot combine with non-existent likelihood, so don't print the reweighted value.
1113  }
1114 
1115  // Output the new reweighted likelihood (if all components were valid)
1116  if(is_valid) getPrinter().print(combined_logL, reweighted_loglike_name, MPIrank, pointID);
1117 
1118  }
1119 
1132  }
1133  else if(not discard_points_outside_cuts)
1134  {
1138  getPrinter().print(MPIrank, "MPIrank", MPIrank, pointID);
1139  getPrinter().print(pointID, "pointID", MPIrank, pointID);
1140  // Now the modelparameters
1141  for(auto it=req_models.begin(); it!=req_models.end(); ++it)
1142  {
1143  ModelParameters modelparameters;
1144  std::string model = it->first;
1145  bool is_valid = getReader().retrieve(modelparameters, model);
1146  if(is_valid)
1147  {
1148  // Use the OutputName set by the reader to preserve the original naming of the modelparameters.
1149  getPrinter().print(modelparameters, modelparameters.getOutputName(), MPIrank, pointID);
1150  }
1151  }
1152  }
1153 
1155  if(not cuts_passed and discard_points_outside_cuts)
1156  {
1157  // Don't copy in this case, just discard the old data.
1158  }
1159  else
1160  {
1161  for(std::set<std::string>::iterator it = data_labels_copy.begin(); it!=data_labels_copy.end(); ++it)
1162  {
1163  // Check if this input label has been mapped to a different output label.
1164  std::string in_label = *it;
1165  std::map<std::string,std::string>::iterator jt = renaming_scheme.find(in_label);
1166  if(jt != renaming_scheme.end())
1167  {
1168  // Found match! Do the renaming
1169  std::string out_label = jt->second;
1170  //std::cout << "Copying data from "<<in_label<<", to output name "<<out_label<<", for point ("<<MPIrank<<", "<<pointID<<")" <<std::endl;
1171  getReader().retrieve_and_print(in_label,out_label,getPrinter(), MPIrank, pointID);
1172  }
1173  else
1174  {
1175  // No match, keep the old name
1176  //std::cout << "Copying data from "<<in_label<<" for point ("<<MPIrank<<", "<<pointID<<")" <<std::endl;
1177  getReader().retrieve_and_print(in_label,getPrinter(), MPIrank, pointID);
1178  }
1179  }
1180  }
1181 
1182  // Check whether the calling code wants us to shut down early
1184 
1185  // Check whether some other process wants us to stop and redistribute the
1186  // processing workload
1187  if(not redistribution_request_ignored)
1188  {
1189  redistribution_requested = check_for_redistribution_request();
1190  //if(not redistribution_requested) std::cout << "Rank "<<rank<<": Still looping, no redistribution request seen." << std::endl;
1191  }
1192 
1193  if(quit)
1194  {
1195  // Need to save data about which points have been processed, so we
1196  // can resume processing from here.
1197  //std::cout << "Postprocessor (rank "<<rank<<") received quit signal! Aborting run." << std::endl;
1198  stop_loop = true;
1199  }
1200  else if(redistribution_requested)
1201  {
1202  //std::cout << "Postprocessor (rank "<<rank<<") received redistribution request!" << std::endl;
1203  // Decide if we are going to stop for the redistribution. Don't bother if we
1204  // only have a small number of points left to process.
1205  std::size_t points_left = mychunk.eff_length - ppi;
1206 
1207  // How many points should we use as a threshold? It depends on how long they
1208  // take to evaluate and how many cpus are available. For now we will pick something
1209  // fairly simply. If there are less than 10 points left, or if there were less than
1210  // 100 points to start with, then we won't bother redistributing
1211  if(points_left > 10 and mychunk.eff_length > 100) // Make an option to vary this?
1212  {
1213  stop_loop = true;
1214  }
1215  else
1216  {
1217  // Just ignore the request and finish our batch normally.
1218  //std::cout << "Postprocessor (rank "<<rank<<") ...but we have less than 10 points left so we will finish them first." << std::endl;
1219  redistribution_requested = false;
1220  redistribution_request_ignored = true; // deactivate the request.
1221  }
1222  }
1223 
1225  if(not stop_loop) current_point = getReader().get_next_point();
1226  }
1227 
1228  // Check if we finished because of reaching the end of the input
1229  if(getReader().eoi() and loopi!=mychunk.end)
1230  {
1231  std::cout << "Postprocessor (rank "<<rank<<") reached the end of the input file! (debug: was this the end of our batch? (loopi="<<loopi<<", mychunk.end="<<mychunk.end<<", dset_length = "<<getReader().get_dataset_length()<<")"<<std::endl;
1232  }
1233 
1234  // Write resume data (even if we finished; other processes might not have)
1235  //std::cout<<"Writing resume data (rank "<<rank<<")...."<< std::endl;
1236  Chunk mydonechunk(mychunk.start,loopi);
1237  record_done_points(done_chunks, mydonechunk, root, rank, numtasks);
1238 
1239  // We now set the return code to inform the calling code of why we stopped.
1240  // 0 - Finished processing all the points we were assigned
1241  // 1 - Saw quit flag and so stopped prematurely
1242  // 2 - Was told to stop by some other process for redistribution of postprocessing work
1243  // 3 - Encountered end of input file unexpectedly
1244  int exit_code = 0;
1245  if(quit)
1246  {
1247  exit_code = 1;
1248  }
1249  else if(redistribution_requested) // and if we obeyed it; if we ignored it then some other exit code is returned
1250  {
1251  exit_code = 2;
1252  }
1253  else if(getReader().eoi() and loopi!=mychunk.end)
1254  {
1255  exit_code = 3;
1256  }
1257  return exit_code;
1258  }
1259 
1260  // Extract model parameters from the reader
1261  bool PPDriver::get_ModelParameters(std::unordered_map<std::string, double>& outputMap)
1262  {
1263  bool valid_modelparams = true;
1264  for(auto it=req_models.begin(); it!=req_models.end(); ++it)
1265  {
1266 
1267  ModelParameters modelparameters;
1268  std::string model = it->first;
1269  bool is_valid = getReader().retrieve(modelparameters, model);
1270  if(not is_valid)
1271  {
1272  valid_modelparams = false;
1273  //std::cout << "ModelParameters marked 'invalid' for model "<<model<<"; point will be skipped." << std::endl;
1274  }
1276  //std::cout << "Retrieved parameters for model '"<<model<<"' at point:" << std::endl;
1277  //std::cout << " ("<<MPIrank<<", "<<pointID<<") (rank,pointID)" << std::endl;
1278  //const std::vector<std::string> names = modelparameters.getKeys();
1279  //for(std::vector<std::string>::const_iterator
1280  // kt = names.begin();
1281  // kt!= names.end(); ++kt)
1282  //{
1283  // std::cout << " " << *kt << " : " << modelparameters[*kt] << std::endl;
1284  //}
1286 
1287  // Check that all the required parameters were retrieved
1288  // Could actually do this in the constructor for the scanner plugin, would be better, but a little more complicated. TODO: do this later.
1289  std::vector<std::string> req_pars = it->second;
1290  std::vector<std::string> retrieved_pars = modelparameters.getKeys();
1291  for(auto jt = req_pars.begin(); jt != req_pars.end(); ++jt)
1292  {
1293  std::string par = *jt;
1294  if(std::find(retrieved_pars.begin(), retrieved_pars.end(), par)
1295  == retrieved_pars.end())
1296  {
1297  std::ostringstream err;
1298  err << "Error! Reader could not retrieve the required paramater '"<<par<<"' for the model '"<<model<<"' from the supplied data file! Please check that this parameter indeed exists in that file." << std::endl;
1299  Scanner::scan_error().raise(LOCAL_INFO,err.str());
1300  }
1301 
1302  // If it was found, add it to the return map
1303  outputMap[ longname[model][par] ] = modelparameters[par];
1304  }
1305  }
1306  return valid_modelparams;
1307  }
1308 
1310  }
1311 }
std::set< std::string > new_params
Names of new output to be printed, i.e. output labels not present in the input file.
void print(T const &in, const std::string &label, const int vertexID, const uint rank, const ulong pointID)
std::size_t update_interval
Number of iterations between progress reports. &#39;0&#39; means no updates.
std::vector< std::string > subtract_from_logl
List of likelihoods in old output to be subtracted from the newly computed likelihood.
std::string reweighted_loglike_name
The label to assign to the results of add_to_like and subtract_from_like operations.
Printers::BaseBasePrinter * printer
The printer for the primary output stream of the scan.
Printers::BaseBaseReader & getReader()
Safe accessors for pointer data.
unsigned long long int pointID
ChunkSet done_chunks
Chunks describing the points that can be auto-skipped (because they have been processed previously) ...
unsigned long long total_length
Total length of input dataset.
unsigned int rank
MPI variables (set manually rather than inferred, to allow for "virtual rank" settings.
void record_done_points(const ChunkSet &done_chunks, const Chunk &mydone, const std::string &filebase, unsigned int rank, unsigned int size)
Write resume data files These specify which chunks of points have been processed during this run...
PPDriver()
PPDriver member function definitions.
Struct to describe start and end indices for a chunk of data.
Definition: chunks.hpp:21
ChunkSet get_done_points(const std::string &filebase)
Read through resume data files and reconstruct which chunks of points have already been processed...
std::map< std::string, double > cut_greater_than
std::string root
Path to save resume files.
void check_settings()
Check postprocessor settings for consistency and general validity.
std::set< Chunk, ChunkLess > ChunkSet
Definition: chunks.hpp:69
Printers::BaseBasePrinter & getPrinter()
std::map< std::string, std::string > renaming_scheme
Map for renaming old datasets in the new output Keys are "in_label", values are "out_label".
#define LOCAL_INFO
Definition: local_info.hpp:34
std::string logl_purpose_name
Label assigned to the output of the likelihood container.
std::size_t eff_length
Definition: chunks.hpp:25
std::size_t end
Definition: chunks.hpp:24
Chunk get_my_chunk(const std::size_t dset_length, const ChunkSet &done_chunks, const int rank, const int numtasks)
Compute start/end indices for a given rank process, given previous "done_chunk" data.
bool discard_old_logl
Allow old likelihood components to be overwritten by newly calculated values?
static const int REDIST_REQ
void send_redistribution_request()
EXPORT_SYMBOLS bool & auto_increment()
Global flag to indicate if auto-incrementing of the PointID by the likelihood container is allowed...
virtual ulong get_dataset_length()=0
Definitions of new MPI datatypes needed by printers.
"Postprocessing" scanner plugin.
std::map< std::string, std::vector< std::string > > req_models
Models required by the scan.
virtual PPIDpair get_next_point()=0
int run_main_loop(const ChunkSet &done_chunks)
The main run loop.
unsigned int numtasks
MPI variables (set manually rather than inferred, to allow for "virtual rank" settings.
General small utility functions.
EXPORT_SYMBOLS bool startsWith(const std::string &str, const std::string &prefix, bool case_sensitive=true)
Checks whether `str&#39; begins with `prefix&#39;.
Chunk get_effective_chunk(const std::size_t total_length, const unsigned int rank, const unsigned int numtasks)
Get &#39;effective&#39; start and end positions for a processing batch i.e.
bool get_ModelParameters(std::unordered_map< std::string, double > &outputMap)
std::set< std::string > all_params
Names of all output that the primary printer knows about at startup (things GAMBIT plans to print fro...
EXPORT_SYMBOLS bool file_exists(const std::string &filename)
Check if a file exists.
dictionary fin
ChunkSet merge_chunks(const ChunkSet &)
Simplify a ChunkSet by merging chunks which overlap.
Printers::BaseBaseReader * reader
The reader object in use for the scan.
Printers::BaseBasePrinter printer
Type of the printer objects.
virtual ulong get_current_index()=0
START_MODEL dNur_CMB r
bool retrieve(T &out, const std::string &label, const uint rank, const ulong pointID)
Printer-retrieve dispatch function.
std::set< std::string > data_labels_copy
Labels of output datasets to be copied.
bool point_done(const ChunkSet done_chunks, size_t index)
Helper functions for performing resume related tasks.
bool check_for_redistribution_request()
std::vector< std::string > add_to_logl
List of likelihoods in old output to be added to the newly computed likelihood.
std::string getOutputName() const
EXPORT_SYMBOLS std::vector< str > delimiterSplit(str s, str delim)
Split a string into a vector of strings, using a delimiter, and removing any whitespace around the de...
std::size_t start
Definition: chunks.hpp:23
EXPORT_SYMBOLS error & scan_error()
Scanner errors.
Options object for PPDriver See matching options in PPDriver class for description.
std::map< std::string, std::map< std::string, std::string > > longname
Map to retrieve the "model::parameter" version of the parameter name.
std::size_t length() const
Definition: chunks.hpp:42
void clear_redistribution_requests()
Class for holding model parameters.
bool discard_points_outside_cuts
Flag to throw away points that don&#39;t pass the cuts (rather than copying them un-processed) ...
std::map< std::string, double > cut_less_than
Cut maps, for selecting only points in the input datasets which pass certain criteria.
EXPORT_SYMBOLS const PPIDpair nullpoint
Define &#39;nullpoint&#39; const.
EXPORT_SYMBOLS pluginInfo plugin_info
Access Functor for plugin info.
bool retrieve_and_print(const std::string &label, BaseBasePrinter &printer, const uint rank, const ulong pointID)
Retrieve and directly print data to new output.
pointID / process number pair Used to identify a single parameter space point
TODO: see if we can use this one:
Definition: Analysis.hpp:33
std::vector< std::string > getKeys() const
Get parameter keys (names), probably for external iteration.
likelihood container for scanner plugins.
virtual std::size_t get_type(const std::string &label)=0
Get type information for a data entry, i.e.
std::set< std::string > data_labels
Labels of all output datasets.
Scanner::like_ptr LogLike
The likelihood container plugin.
Scanner::like_ptr getLogLike()