gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-2191-ga4742ac
a Global And Modular Bsm Inference Tool
sqliteprinter.cpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
20 
21 #include <iostream>
22 #include <sstream>
23 #include <chrono>
24 #include <thread>
25 
26 // SQLite3 C interface
27 #include <sqlite3.h>
28 
29 // Gambit
31 #include "gambit/Logs/logger.hpp"
32 
33 // Define this macro to dump attempted SQL statements during exceptions
34 #define SQL_DEBUG
35 
36 namespace Gambit
37 {
38  namespace Printers
39  {
40 
41  // Constructor
42  SQLitePrinter::SQLitePrinter(const Options& options, BasePrinter* const primary)
43  : BasePrinter(primary,options.getValueOrDef<bool>(false,"auxilliary"))
44  , SQLiteBase()
45 #ifdef WITH_MPI
46  , myComm() // initially attaches to MPI_COMM_WORLD
47 #endif
48  , mpiRank(0)
49  , mpiSize(1)
50  , primary_printer(NULL)
51  , column_record()
52  , max_buffer_length(options.getValueOrDef<std::size_t>(1,"buffer_length"))
53  , buffer_info()
54  , buffer_header()
55  , transaction_data_buffer()
56  , synchronised(!options.getValueOrDef<bool>(false,"auxilliary"))
57  {
58  std::string database_file;
59  std::string table_name;
60 
61  if(is_auxilliary_printer())
62  {
63  // If this is an "auxilliary" printer then we need to get some
64  // of our options from the primary printer
65  primary_printer = dynamic_cast<SQLitePrinter*>(this->get_primary_printer());
66  database_file = primary_printer->get_database_file();
67  table_name = primary_printer->get_table_name();
69  }
70  else
71  {
72  // MPI setup
73 #ifdef WITH_MPI
74  this->setRank(myComm.Get_rank()); // tells base class about rank
75  mpiRank = myComm.Get_rank();
76  mpiSize = myComm.Get_size();
77 #endif
78 
79  // Register dataset names that this printer needs to use itself
80  // ("MPIrank" and "pointID" are always automatically registered)
81  addToPrintList("pairID");
82 
83  // Tell scannerbit if we are resuming
84  set_resume(options.getValue<bool>("resume"));
85 
86  // Get path of database file where results should ultimately end up
87  std::ostringstream ff;
88  if(options.hasKey("output_path"))
89  {
90  ff << options.getValue<std::string>("output_path") << "/";
91  }
92  else
93  {
94  ff << options.getValue<std::string>("default_output_path") << "/";
95  }
96 
97  if(options.hasKey("output_file"))
98  {
99  ff << options.getValue<std::string>("output_file");
100  }
101  else
102  {
103  printer_error().raise(LOCAL_INFO, "No 'output_file' entry specified in the options section of the Printer category of the input YAML file. Please add a name there for the output sqlite database file of the scan.");
104  }
105 
106  database_file = ff.str();
107 
108  // Get the name of the data table for this run
109  table_name = options.getValueOrDef<std::string>("results","table_name");
110 
111  // Delete final target file if one with same name already exists? (and if we are restarting the run)
112  // Mostly for convenience during testing. Recommend to use 'false' for serious runs to avoid
113  // accidentally deleting valuable output.
114  bool overwrite_file = options.getValueOrDef<bool>(false,"delete_file_on_restart");
115 
116  if(getRank()==0 and overwrite_file and not get_resume())
117  {
118  // Note: "not resume" means "start or restart"
119  // Delete existing output file
120  std::ostringstream command;
121  command << "rm -f "<<database_file;
122  logger() << LogTags::printers << LogTags::info << "Running shell command: " << command.str() << EOM;
123  FILE* fp = popen(command.str().c_str(), "r");
124  if(fp==NULL)
125  {
126  // Error running popen
127  std::ostringstream errmsg;
128  errmsg << "rank "<<getRank()<<": Error deleting existing output file (requested by 'delete_file_on_restart' printer option; target filename is "<<database_file<<")! popen failed to run the command (command was '"<<command.str()<<"')";
129  printer_error().raise(LOCAL_INFO, errmsg.str());
130  }
131  else if(pclose(fp)!=0)
132  {
133  // Command returned exit code!=0, or pclose failed
134  std::ostringstream errmsg;
135  errmsg << "rank "<<getRank()<<": Error deleting existing output file (requested by 'delete_file_on_restart' printer option; target filename is "<<database_file<<")! Shell command failed to executed successfully, see stderr (command was '"<<command.str()<<"').";
136  printer_error().raise(LOCAL_INFO, errmsg.str());
137  }
138  }
139 
140 #ifdef WITH_MPI
141  // Make sure no processes try to open database until we are sure it won't be deleted and replaced
142  myComm.Barrier();
143 #endif
144  }
145 
146  // Create/open the database file
147  open_db(database_file,'+');
148 
149  // Create the results table in the database (if it doesn't already exist)
150  make_table(table_name);
151  set_table_name(table_name); // Inform base class of table name
152 
153  // If we are resuming and this is the primary printer, need to read the database and find the previous
154  // highest pointID numbers used for this rank
155  std::size_t my_highest_pointID=0;
156  if(not is_auxilliary_printer() and get_resume())
157  {
158  // Construct the SQLite3 statement to retrieve highest existing pointID in the database for this rank
159  std::stringstream sql;
160  sql << "SELECT MAX(pointID) FROM "<<get_table_name()<<" WHERE MPIrank="<<mpiRank;
161 
162  /* Execute SQL statement and iterate through results*/
163  sqlite3_stmt *stmt;
164  int rc = sqlite3_prepare_v2(get_db(), sql.str().c_str(), -1, &stmt, NULL);
165  if (rc != SQLITE_OK) {
166  std::stringstream err;
167  err<<"Encountered SQLite error while preparing to retrieve previous pointIDs: "<<sqlite3_errmsg(get_db());
168  printer_error().raise(LOCAL_INFO, err.str());
169  }
170  int colcount=0;
171  while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
172  my_highest_pointID = sqlite3_column_int64(stmt, 0);
173  colcount++;
174  if(colcount>1)
175  {
176  std::stringstream err;
177  err<<"SQLite statement to retrieve highest existing pointID returned more than one result! This doesn't make sense, so there is probably a bug in the statement that was used. Statement was: "<<sql.str();
178  printer_error().raise(LOCAL_INFO, err.str());
179  }
180  }
181  if (rc != SQLITE_DONE) {
182  std::stringstream err;
183  err<<"Encountered SQLite error while retrieving previous pointIDs: "<<sqlite3_errmsg(get_db());
184  printer_error().raise(LOCAL_INFO, err.str());
185  }
186  sqlite3_finalize(stmt);
187 
188  // Need to make sure no other processes start adding new stuff before everyone has figured out
189  // their next unused pointID
190 #ifdef WITH_MPI
191  myComm.Barrier();
192 #endif
193  if (get_resume())
194  {
195  get_point_id() = my_highest_pointID;
196  }
197 
198  // DEBUG
199  //std::cout<<"Highest pointID retrieved for rank "<<mpiRank<<" was: "<<get_point_id();
200  }
201  }
202 
204 
205  void SQLitePrinter::initialise(const std::vector<int>&)
206  {
207  // Don't need to initialise anything for this printer
208  }
209 
210  void SQLitePrinter::reset(bool force)
211  {
212  // This is needed by e.g. MultiNest to delete old weights and replace them
213  // with new ones.
214 
215  // Primary printers aren't allowed to delete stuff unless 'force' is set to true
216  if((is_auxilliary_printer() or force) and (buffer_header.size()>0))
217  {
218  // Read through header to see what columns this printer has been touching. These are
219  // the ones that we will reset/delete.
220  // (a more nuanced reset might be required in the future?)
221  std::stringstream sql;
222  sql<<"UPDATE "<<get_table_name()<<" SET ";
223  for(auto col_name_it=buffer_header.begin(); col_name_it!=buffer_header.end(); ++col_name_it)
224  {
225  sql<<"`"<<(*col_name_it)<<"`=null"<<comma_unless_last(col_name_it,buffer_header);
226  }
227  sql<<";";
228 
229  /* Execute SQL statement */
230  submit_sql(LOCAL_INFO, sql.str());
231  }
232  }
233 
234  void SQLitePrinter::finalise(bool /*abnormal*/)
235  {
236  // Dump buffer to disk. Nothing special needed for early shutdown.
237  dump_buffer();
238  }
239 
241  {
242  dump_buffer();
243  }
244 
245  // Reader construction options for constructing a reader
246  // object that can read the output we are printing
248  {
249  Options options;
250  // Set options that we need later to construct a reader object for
251  // previous output, if required.
252  options.setValue("type", "sqlite");
253  options.setValue("file", get_database_file());
254  options.setValue("table", get_table_name());
255  return options;
256  }
257 
258  // Create results table
259  void SQLitePrinter::make_table(const std::string& name)
260  {
261  // Construct the SQLite3 statement
262  std::stringstream sql;
263  sql << "CREATE TABLE IF NOT EXISTS "<<name<<"("
264  << "pairID INT PRIMARY KEY NOT NULL,"
265  << "MPIrank INT,"
266  << "pointID INT"
267  << ");";
268 
269  /* Execute SQL statement */
270  submit_sql(LOCAL_INFO, sql.str());
271 
272  // Flag the results table as existing
274  }
275 
276  // Check that a table column exists with the correct type, and create it if needed
277  void SQLitePrinter::ensure_column_exists(const std::string& sql_col_name, const std::string& sql_col_type)
278  {
280  auto it = column_record.find(sql_col_name);
281  if(it == column_record.end())
282  {
283  // Column not marked as existing. But it might have been
284  // created by another process, so we need to check the
285  // database directly. It seems like the best way to do
286  // this is to just attempt to add the column. If it fails
287  // we can then explicitly check the column names to
288  // make sure that the reason for failure was because
289  // the column already existed, and not some other reason.
290 
291  std::stringstream sql;
292  sql<<"ALTER TABLE "<<get_table_name()<<" ADD COLUMN `"<<sql_col_name<<"` "<<sql_col_type<<";";
293 
294  /* Execute SQL statement */
295  int rc;
296  char *zErrMsg = 0;
297  // Need allow_fail=true for this case
298  rc = submit_sql(LOCAL_INFO, sql.str(), true, NULL, NULL, &zErrMsg);
299 
300  if( rc != SQLITE_OK ){
301  // Operation failed for some reason. Probably because the column already
302  // exists, but we better make sure.
303 
304  std::stringstream sql2;
305  sql2<<"PRAGMA table_info("<<get_table_name()<<");";
306 
307  /* Execute SQL statement */
308  int rc2;
309  char *zErrMsg2 = 0;
310  std::map<std::string, std::string, Utils::ci_less> colnames; // Will be passed to and filled by the callback function
311  rc2 = submit_sql(LOCAL_INFO, sql2.str(), true, &col_name_callback, &colnames, &zErrMsg2);
312 
313  if( rc2 != SQLITE_OK ){
314  std::stringstream err;
315  err << "Failed to check SQL column names in output table, after failing to add a new column '"<<sql_col_name<<"' to that table."<<std::endl;
316  err << " First SQL error was: " << zErrMsg << std::endl;
317 #ifdef SQL_DEBUG
318  err << " The attempted SQL statement was:"<<std::endl;
319  err << sql.str() << std::endl;
320 #endif
321  err << " Second SQL error was: " << zErrMsg2 << std::endl;
322 #ifdef SQL_DEBUG
323  err << " The attempted SQL statement was:"<<std::endl;
324  err << sql2.str() << std::endl;
325 #endif
326  sqlite3_free(zErrMsg);
327  sqlite3_free(zErrMsg2);
328  printer_error().raise(LOCAL_INFO,err.str());
329  }
330 
331  // Operation successful, check if our column name exists and has the correct type
332  auto jt = colnames.find(sql_col_name);
333  if(jt==colnames.end())
334  {
335  // Column not found
336  std::stringstream err;
337  err << "Failed to add new column '"<<sql_col_name<<"' to output SQL table! The ALTER TABLE operation failed, however it was not because the column already existed (we successfully checked and the column was not found). The SQL error was: " << zErrMsg << std::endl;
338 #ifdef SQL_DEBUG
339  err << "The attempted SQL statement was:"<<std::endl;
340  err << sql.str() << std::endl;
341 #endif
342  sqlite3_free(zErrMsg);
343  printer_error().raise(LOCAL_INFO,err.str());
344  }
345  else if(!Utils::iequals(jt->second,sql_col_type))
346  {
347  // NOTE: All sorts of type names are equivalent, so this simple string checking is
348  // totally unreliable!
349 
350  // // Column found, but has the wrong type
351  // std::stringstream err;
352  // err << "Failed to add new column '"<<sql_col_name<<"' to output SQL table! The column already exists, but it has the wrong type (existing column has type '"<<jt->second<<"', but we expected it to have type '"<<sql_col_type<<"'!";
353  // sqlite3_free(zErrMsg);
354  // printer_error().raise(LOCAL_INFO,err.str());
355  }
356 
357  // Column exists and has the right type! So everything is ok after all.
358  }
359 
360  // Column should exist now. Need to add the fact of this columns existence to our internal record.
361  column_record[sql_col_name] = sql_col_type;
362  }
363  else if(!Utils::iequals(it->second,sql_col_type))
364  {
365  // // Records say column exists, but not with the type requested!
366  // NOTE: All sorts of type names are equivalent, so this simple string checking is
367  // totally unreliable!
368 
369  // std::stringstream err;
370  // err << "SQLitePrinter records indicated that the column '"<<sql_col_name<<"' already exists in the output table, but with a different type than has been requested (existing type is '"<<it->second<<"', requested type was '"<<sql_col_type<<"'). This indicates either duplicate names in the printer output, or an inconsistency in how the print commands have been issued.";
371  // printer_error().raise(LOCAL_INFO,err.str());
372  }
373  // else column exists and type matches, proceed!
374  }
375 
376  // Queue data for a table insert operation into the SQLitePrinter internal buffer
377  void SQLitePrinter::insert_data(const unsigned int mpirank, const unsigned long pointID, const std::string& col_name, const std::string& col_type, const std::string& data)
378  {
379  // Get the pairID for this rank/pointID combination
380  std::size_t rowID = pairfunc(mpirank,pointID);
381 
382  // Make sure we have a record of this column existing in the output table
383  // Create it if needed.
384  ensure_column_exists(col_name, col_type);
385 
386  // Check if a row for this data exists in the transaction buffer
387  auto buf_it=transaction_data_buffer.find(rowID);
388  if(buf_it==transaction_data_buffer.end())
389  {
390  // Nope, no row yet for this rowID. Add it.
391  // But we should first dump the buffer if it was full
392 
393  // If the buffer is full, execute a transaction to write
394  // data to disk, and clear the buffer
396  {
397  dump_buffer();
398  }
399 
400  // Data is set to 'null' until we add some.
401  std::size_t current_row_size=buffer_info.size();
402  transaction_data_buffer.emplace(rowID,std::vector<std::string>(current_row_size,"null"));
403  }
404 
405  // Check if this column exists in the current output buffer
406  // Create it if needed
407  auto it=buffer_info.find(col_name);
408  if(it==buffer_info.end())
409  {
410  // Column doesn't exist in buffer. Add it.
411  std::size_t next_col_index = buffer_info.size();
412  buffer_info[col_name] = std::make_pair(next_col_index,col_type);
413 
414  // Add header data
415  //std::cout<<"Adding column to buffer: "<<col_name<<std::endl;
416  buffer_header.push_back(col_name);
417  if(buffer_info.size()!=buffer_header.size())
418  {
419  std::stringstream err;
420  err<<"Size of buffer_header ("<<buffer_header.size()<<") does not match buffer_info ("<<buffer_info.size()<<"). This is a bug, please report it.";
421  printer_error().raise(LOCAL_INFO,err.str());
422  }
423 
424  // Add buffer space
425  for(auto jt=transaction_data_buffer.begin();
426  jt!=transaction_data_buffer.end(); ++jt)
427  {
428  std::vector<std::string>& row = jt->second;
429 
430  // Add new empty column to every row
431  // Values are null until we add them
432  row.push_back("null");
433 
434  // Make sure size is correct
435  if(row.size()!=buffer_header.size())
436  {
437  std::stringstream err;
438  err<<"Size of a row in the transaction_data_buffer ("<<row.size()<<") does not match buffer_header ("<<buffer_info.size()<<"). This is a bug, please report it.";
439  printer_error().raise(LOCAL_INFO,err.str());
440  }
441  }
442 
443  // Now point the map iterator to the right place
444 
445  it=buffer_info.find(col_name);
446  }
447  else
448  {
449  // Column exists in buffer, but we should also make sure the
450  // type is consistent with the new data we are adding
451  std::string buffer_col_type = it->second.second;
452  if(!Utils::iequals(buffer_col_type,col_type))
453  {
454  std::stringstream err;
455  err<<"Attempted to add data for column '"<<col_name<<"' to SQLitePrinter transaction buffer, but the type of the new data ("<<col_type<<") does not match the type already recorded for this column in the buffer ("<<buffer_col_type<<").";
456  printer_error().raise(LOCAL_INFO,err.str());
457  }
458  }
459 
460  // Add the data to the transaction buffer
461  std::size_t col_index = it->second.first;
462  transaction_data_buffer.at(rowID).at(col_index) = data;
463  }
464 
465  // Delete all buffer data. Leaves the header intact so that we know what columns
466  // this printer has been working with (needed so we can reset them if needed!)
468  {
469  transaction_data_buffer.clear();
470  }
471 
472  // Create an SQL table insert operation for the current transaction_data_buffer
473  // Modifies 'sql' stringstream in-place
474  void SQLitePrinter::turn_buffer_into_insert(std::stringstream& sql, const std::string& table)
475  {
476  sql<<"INSERT INTO "<<table<<" (\npairID,\n";
477  for(auto col_name_it=buffer_header.begin(); col_name_it!=buffer_header.end(); ++col_name_it)
478  {
479  sql<<"`"<<(*col_name_it)<<"`"<<comma_unless_last(col_name_it,buffer_header)<<"\n";
480  }
481  sql<<") VALUES ";
482  for(auto row_it=transaction_data_buffer.begin();
483  row_it!=transaction_data_buffer.end(); ++row_it)
484  {
485  sql<<"(\n";
486  std::size_t pairID = row_it->first;
487  std::vector<std::string>& row = row_it->second;
488  sql<<pairID<<",\n";
489  for(auto col_it=row.begin(); col_it!=row.end(); ++col_it)
490  {
491  sql<<(*col_it)<<comma_unless_last(col_it,row)<<"\n";
492  }
493  sql<<")\n"<<comma_unless_last(row_it,transaction_data_buffer);
494  }
495  sql<<";"; // End statement
496  }
497 
498  // Execute an SQLite transaction to write the buffer to the output table
500  {
501  // Add the table INSERT operation to a stream
502  std::stringstream sql;
504 
505  //std::cout<<sql.str(); // DEBUG
506 
507  /* Execute SQL statement */
508  submit_sql(LOCAL_INFO,sql.str());
509  }
510 
512  {
513  std::stringstream sql;
514  // So for this is seems like the best thing to do is create a temporary
515  // table with this new data, and then update the main output table from
516  // this. Otherwise we have to write tonnes of separate 'update' statements,
517  // which is probably not very fast.
518  // So first we need to create the temporary table.
519  sql << "DROP TABLE IF EXISTS temp_table;\n"
520  << "CREATE TEMPORARY TABLE temp_table("
521  << "pairID INT PRIMARY KEY NOT NULL,\n";
522  for(auto col_it=buffer_info.begin(); col_it!=buffer_info.end(); ++col_it)
523  {
524  const std::string& col_name(col_it->first);
525  const std::string& col_type(col_it->second.second);
526  sql<<"`"<<col_name<<"` "<<col_type<<comma_unless_last(col_it,buffer_info)<<"\n";
527  }
528  sql <<");\n";
529 
530  // Insert data into the temporary table
531  turn_buffer_into_insert(sql,"temp_table");
532 
533  // Update the primary output table using the temporary table
534  // Following: https://stackoverflow.com/a/47753166/1447953
535  sql<<"UPDATE "<<get_table_name()<<" SET (\n";
536  for(auto col_name_it=buffer_header.begin(); col_name_it!=buffer_header.end(); ++col_name_it)
537  {
538  sql<<*col_name_it<<comma_unless_last(col_name_it,buffer_header)<<"\n";
539  }
540  sql<<") = (SELECT \n";
541  for(auto col_name_it=buffer_header.begin(); col_name_it!=buffer_header.end(); ++col_name_it)
542  {
543  sql<<"temp_table."<<*col_name_it<<comma_unless_last(col_name_it,buffer_header)<<"\n";
544  }
545  sql<<" FROM temp_table WHERE temp_table.pairID = "<<get_table_name()<<".pairID)\n";
546  sql<<" WHERE EXISTS ( SELECT * FROM temp_table WHERE temp_table.pairID = "<<get_table_name()<<".pairID);\n";
547 
548  /* Execute SQL statement */
549  submit_sql(LOCAL_INFO,sql.str());
550  }
551 
553  {
555  // Don't try to dump the buffer if it is empty!
556  if(transaction_data_buffer.size()>0)
557  {
558  if(synchronised)
559  {
560  // Primary dataset writes can be performed as INSERT operations
562  }
563  else
564  {
565  // Asynchronous ('auxilliary') writes need to be performed as UPDATE operations
567  }
568  // Clear all the buffer data
569  clear_buffer();
570  }
571  }
572 
573  }
574 }
void set_table_name(const std::string &table_name)
Definition: sqlitebase.cpp:324
greatScanData data
Definition: great.cpp:38
int submit_sql(const std::string &local_info, const std::string &sqlstr, bool allow_fail=false, sql_callback_fptr callback=NULL, void *data=NULL, char **zErrMsg=NULL)
Definition: sqlitebase.cpp:258
EXPORT_SYMBOLS error & printer_error()
Printer errors.
#define LOCAL_INFO
Definition: local_info.hpp:34
void initialise(const std::vector< int > &)
Virtual function overloads:
std::map< std::size_t, std::vector< std::string > > transaction_data_buffer
STL namespace.
void reset(bool force=false)
Logging access header for GAMBIT.
EXPORT_SYMBOLS unsigned long long int & get_point_id()
Returns unigue pointid;.
void finalise(bool abnormal=false)
std::map< std::string, std::string, Utils::ci_less > column_record
bool hasKey(const args &... keys) const
Getters for key/value pairs (which is all the options node should contain)
TYPE getValue(const args &... keys) const
SQLite printer class declaration.
std::vector< std::string > buffer_header
EXPORT_SYMBOLS bool iequals(const std::string &a, const std::string &b, bool case_sensitive=false)
Perform a (possibly) case-insensitive string comparison.
const Logging::endofmessage EOM
Explicit const instance of the end of message struct in Gambit namespace.
Definition: logger.hpp:100
std::size_t max_buffer_length
Buffer variable.
EXPORT_SYMBOLS Logging::LogMaster & logger()
Function to retrieve a reference to the Gambit global log object.
Definition: logger.cpp:95
std::string comma_unless_last(Iter it, const Cont &c)
Definition: sqlitebase.hpp:82
int col_name_callback(void *colmap_in, int, char **data, char **)
Definition: sqlitebase.cpp:43
TYPE getValueOrDef(TYPE def, const args &... keys) const
void turn_buffer_into_insert(std::stringstream &sql, const std::string &table)
void make_table(const std::string &)
void open_db(const std::string &, char access='r')
Definition: sqlitebase.cpp:164
std::map< std::string, std::pair< std::size_t, std::string >, Utils::ci_less > buffer_info
void insert_data(const unsigned int mpirank, const unsigned long pointID, const std::string &col_name, const std::string &col_type, const std::string &data)
void setValue(const KEYTYPE &key, const VALTYPE &val)
Basic setter, for adding extra options.
The main printer class for output to SQLite database.
void ensure_column_exists(const std::string &, const std::string &)
SQLite base class for both reader and writer.
Definition: sqlitebase.hpp:93
TODO: see if we can use this one:
Definition: Analysis.hpp:33
std::size_t pairfunc(const std::size_t i, const std::size_t j)
Definition: sqlitebase.hpp:36
A small wrapper object for &#39;options&#39; nodes.
SQLitePrinter(const Options &, BasePrinter *const primary=NULL)
Constructor (for construction via inifile options)