gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-2191-ga4742ac
a Global And Modular Bsm Inference Tool
VertexBufferNumeric1D.hpp
Go to the documentation of this file.
1 // GAMBIT: Global and Modular BSM Inference Tool
2 // *********************************************
21 
22 #ifndef __VertexBufferNumeric1D_hpp__
23 #define __VertexBufferNumeric1D_hpp__
24 
25 #include <cstddef>
26 #include <sstream>
27 #include <iostream>
28 
29 // HDF5 C bindings
30 #include <hdf5.h>
31 
32 // Gambit
36 
37 // MPI bindings
39 
40 //#define BUF_DEBUG /* Triggers debugging output */
41 //#define MONITOR_BUF "pointID" /* String ID of buffer to monitor. */
42 
43 
44 namespace Gambit {
45 
46  namespace Printers {
47 
49  struct BuffTags
50  {
51  int SYNC_data; // message contains T buffer_entries[LENGTH]
52  int SYNC_valid; // " " bool buffer_valid[LENGTH]
53  int RA_queue; // " " T RA_write_queue[LENGTH]
54  int RA_loc; // " " hsize_t RA_write_locations[LENGTH]
55  int RA_length; // " " hsize_t RA_queue_length[LENGTH] //TODO err this is wrong
56 
57  static const std::size_t NTAGS=5;
58 
59  bool valid;
60 
62  : SYNC_data (-1)
63  , SYNC_valid(-1)
64  , RA_queue (-1)
65  , RA_loc (-1)
66  , RA_length (-1)
67  , valid(false)
68  {}
69 
70  BuffTags(const int first_tag)
71  : SYNC_data (first_tag)
72  , SYNC_valid(first_tag+1)
73  , RA_queue (first_tag+2)
74  , RA_loc (first_tag+3)
75  , RA_length (first_tag+4)
76  , valid(true)
77  {
78  if(first_tag==-1)
79  {
80  valid=false;
81  }
82  else if(first_tag<FIRST_EMPTY_TAG)
83  {
84  std::ostringstream errmsg;
85  errmsg << "Error! Tried to create (valid) BuffTags from first_tag<FIRST_EMPTY_TAG ("<<first_tag<<"<"<<FIRST_EMPTY_TAG<<") (i.e. using reserved, or invalid, tag values)";
86  printer_error().raise(LOCAL_INFO, errmsg.str());
87  }
88  }
89  };
90 
92  template<class T, std::size_t LENGTH>
94  {
95  protected:
97  // Using arrays as these are easier to write to hdf5
98  bool buffer_valid[LENGTH]; // Array telling us which buffer entries are properly filled
99  T buffer_entries[LENGTH];
100 
101  // DEPRECATED! No more MPI stuff needed.
102  // /// Buffers to store data waiting to be sent via MPI
103  // #ifdef WITH_MPI
104  // BuffTags myTags; // Collection of MPI tags needed for passing messages
105  // GMPI::Comm printerComm; // MPI communicator object from printer
106 
107  // int send_buffer_valid[LENGTH];
108  // T send_buffer_entries[LENGTH];
109  // bool send_buffer_ready = true; // flag to signal if send buffer can be filled with new data.
110 
111  // // Request handles for tracking status of a sent message
112  // MPI_Request req_valid =MPI_REQUEST_NULL;
113  // MPI_Request req_entries=MPI_REQUEST_NULL;
114 
115  // // Status handles
116  // MPI_Status stat_valid;
117  // MPI_Status stat_entries;
118  // #endif
119  // /// @}
120 
123  T RA_write_queue[LENGTH];
125  PPIDpair RA_write_locations[LENGTH];
127  uint RA_queue_length = 0;
128 
129  // DEPRECATED! No more MPI stuff needed
130  // #ifdef WITH_MPI
131  // /// MPI buffers, flags, and status+request handles for RA messages
132  // T send_buffer_RA_write_q[LENGTH];
133  // PPIDpair send_buffer_RA_write_loc[LENGTH];
134  // uint send_buffer_RA_q_len;
135  // uint null_message;
136  // bool RA_send_buffer_ready = true; // flag to signal if send buffer can be filled with new data.
137 
138  // // Request handles for tracking status of a sent message
139  // MPI_Request req_RA_write_q =MPI_REQUEST_NULL;
140  // MPI_Request req_RA_write_loc=MPI_REQUEST_NULL;
141  // MPI_Request req_RA_q_len =MPI_REQUEST_NULL;
142  // MPI_Request req_RA_SENT =MPI_REQUEST_NULL;
143 
144  // // Status handles
145  // MPI_Status stat_RA_write_q;
146  // MPI_Status stat_RA_write_loc;
147  // MPI_Status stat_RA_q_len;
148  // MPI_Status stat_RA_SENT;
149  // #endif
150  // /// @}
151 
153  uint myRank = 0;
154 
155  private:
156  static const std::size_t bufferlength = LENGTH;
157 
160 
162  static const PPIDpair null_PPID;
163 
164  public:
167  : VertexBufferBase()
168  , buffer_valid()
169  , buffer_entries()
170  , PPID_of_last_append(null_PPID)
171  {}
172 
173  // DEPRECATED! No more MPI needed
174  // #ifdef WITH_MPI
175  // VertexBufferNumeric1D(
176  // const std::string& label
177  // , const int vID
178  // , const unsigned int i
179  // , const bool sync
180  // , const bool sil
181  // , const bool resume
182  // , const BuffTags& tags
183  // , const GMPI::Comm& pComm
184  // ): VertexBufferBase(label,vID,i,sync,sil,resume,true)
185  // , buffer_valid()
186  // , buffer_entries()
187  // , myTags(tags)
188  // , printerComm(pComm)
189  // , PPID_of_last_append(null_PPID)
190  // {
191  // myRank = pComm.Get_rank();
192 
193  // //Debugging
194  // #ifdef BUF_DEBUG
195  // std::cout<<this->get_label()<<": My tags are: "
196  // <<tags.SYNC_data <<", "
197  // <<tags.SYNC_valid <<", "
198  // <<tags.RA_queue <<", "
199  // <<tags.RA_loc <<", "
200  // <<tags.RA_length <<", "
201  // <<std::endl;
202  // #endif
203  // }
204  // #endif
205 
206  // No-MPI constructor. Can also be used with MPI, if output
207  // is to be combined post-run.
209  const std::string& label
210  , const int vID
211  , const unsigned int i
212  , const bool sync
213  , const bool sil
214  , const bool resume
215  , const char access
216  ): VertexBufferBase(label,vID,i,sync,sil,resume,false,access)
217  , buffer_valid()
218  , buffer_entries()
219  // #ifdef WITH_MPI
220  // , myTags()
221  // , printerComm()
222  // #endif
223  , PPID_of_last_append(null_PPID)
224  {
225  // #ifdef WITH_MPI
226  // myRank = printerComm.Get_rank();
227  // #endif
228  }
229 
230 
233  {}
234 
236  void append(const T& value, const PPIDpair pID = null_PPID);
237 
239  virtual unsigned long dset_head_pos() = 0;
240 
242  virtual void update_dset_head_pos() = 0;
243 
245  void RA_write(const T& value, const PPIDpair pID, const std::map<PPIDpair, ulong>& PPID_to_dsetindex);
246 
248  virtual void skip_append();
249 
254  virtual void N_skip_append(ulong N);
255 
256  // Trigger MPI send of sync buffer to master node, or write to disk
257  virtual void flush();
258 
259  // Trigger MPI send of random-access buffer to master node, or write to disk
260  virtual void RA_flush(const std::map<PPIDpair, ulong>& PPID_to_dsetindex);
261 
262  // Perform write to disk of sync buffer
263  virtual void write_to_disk() = 0;
264 
265  // Perform write to disk of random-access buffer
266  virtual void RA_write_to_disk(const std::map<PPIDpair, ulong>& PPID_to_dsetindex) = 0;
267 
269  virtual void write_external_to_disk(const T (&values)[LENGTH], const bool (&isvalid)[LENGTH]) = 0;
270 
271  // #ifdef WITH_MPI
272  // // Probe for a sync buffer MPI message from a process
273  // virtual bool probe_sync_mpi_message(uint,int*);
274 
275  // // Probe for a RA buffer MPI message from a process
276  // virtual bool probe_RA_mpi_message(uint);
277 
278  // // Retrieve sync buffer data from an MPI message from a known process rank
279  // // Should only be triggered if a valid message is known to exist to be retrieved!
280  // virtual void get_sync_mpi_message(uint,int);
281 
282  // // Retrieve RA buffer data from an MPI message from a known process rank
283  // // Should only be triggered if a valid message is known to exist to be retrieved!
284  // virtual void get_RA_mpi_message(uint, const std::map<PPIDpair, ulong>& PPID_to_dsetindex);
285 
286  // // Update myTags with valid values
287  // virtual void update_myTags(uint);
288  // #endif
289 
290  // Report queue length (e.g. for checking that it is empty during finalise)
291  virtual uint get_RA_queue_length() { return RA_queue_length; }
292 
294  T get_entry(const std::size_t i) const;
295 
297  void clear();
298 
299  };
300 
302 
304  template<class T, std::size_t L>
305  const PPIDpair VertexBufferNumeric1D<T,L>::null_PPID = PPIDpair(); // The default constructor for PPID includes a flag marking it as "invalid"
306 
308 
310 
312  template<class T, std::size_t L>
314  {
315  if(not this->is_silenced())
316  {
317  //std::cout<<"rank "<<myRank<<": Buffer "<<this->get_label()<<", head_position ("<<this->get_head_position()<<"), pID ("<<pID.pointID<<","<<pID.rank<<"): running append()"<<std::endl;
318 
319  if(pID!=null_PPID and pID==PPID_of_last_append)
320  {
321  std::ostringstream errmsg;
322  errmsg << "Error! Tried to append data to buffer "<<this->get_label()<<" (vID="<<this->get_vertexID()<<", index="<<this->get_index()<<") but supplied PPID matches PPID_of_last_append, i.e. the previous append was to the same point (rank="<<pID.rank<<", pointID="<<pID.pointID<<", [valid="<<PPID_of_last_append.valid<<"])! This indicates a bug in the buffer calling code.";
323  printer_error().raise(LOCAL_INFO, errmsg.str());
324  }
325 
326  if(sync_buffer_is_full())
327  {
328  std::ostringstream errmsg;
329  errmsg << "Error! Tried to append data to buffer "<<this->get_label()<<" (vID="<<this->get_vertexID()<<", index="<<this->get_index()<<") but the sync buffer is full! It should have been sent to the master node via MPI or written to disk before now.";
330  printer_error().raise(LOCAL_INFO, errmsg.str());
331  }
332 
333  // Debug dump
334  #ifdef BUF_DEBUG
335  #ifdef MONITOR_BUF
336  if(this->get_label()==MONITOR_BUF) {
337  #endif
338  std::cout<<"-------------------------------------"<<std::endl;
339  std::cout<<"rank "<<myRank<<": Called 'VertexBufferNumeric1D<T,L>::append'"<<std::endl;
340  std::cout<<"rank "<<myRank<<": Dump from buffer '"<<this->get_label()<<"'"<<std::endl;
341  std::cout<<"rank "<<myRank<<": dset_head_pos() = "<<dset_head_pos()<<std::endl;
342  std::cout<<"rank "<<myRank<<": donepoint() = "<<this->donepoint()<<std::endl;
343  std::cout<<"rank "<<myRank<<": After write, will increment head_position: "<<this->get_head_position()<<" --> "<<this->get_head_position()+1<<std::endl;
344  #ifdef MONITOR_BUF
345  }
346  #endif
347  #endif
348 
349  error_if_done(); // make sure buffer hasn't written to the current point already
350  buffer_entries[this->get_head_position()] = data;
351  buffer_valid[this->get_head_position()] = true;
352  this->move_head_to_next_slot();
353  this->sync_buffer_empty = false;
354  if(this->get_head_position()==bufferlength)
355  {
356  #ifdef BUF_DEBUG
357  #ifdef MONITOR_BUF
358  if(this->get_label()==MONITOR_BUF) {
359  #endif
360  std::cout<<"rank "<<myRank<<": Buffer "<<this->get_label()<<": head_position ("<<this->get_head_position()<<") == bufferlength ("<<bufferlength<<"); setting sync_buffer_full=true."<<std::endl;
361  #ifdef MONITOR_BUF
362  }
363  #endif
364  #endif
365  this->sync_buffer_full = true;
366  }
367  PPID_of_last_append = pID;
368  }
369  }
370 
372  template<class T, std::size_t L>
374  {
375  if(not this->is_silenced()) {
376  //std::cout<<"rank "<<myRank<<": Buffer "<<this->get_label()<<", head_position ("<<this->get_head_position()<<"): running skip_append()"<<std::endl;
377  if(sync_buffer_is_full())
378  {
379  std::ostringstream errmsg;
380  errmsg << "Error! Tried to skip_append (null-)data to buffer "<<this->get_label()<<" but the sync buffer is full! It should have been sent to the master node via MPI or written to disk before now.";
381  printer_error().raise(LOCAL_INFO, errmsg.str());
382  }
383  error_if_done(); // make sure buffer hasn't written to the current point already
384  buffer_valid[this->get_head_position()] = false;
385  this->move_head_to_next_slot();
386  this->sync_buffer_empty = false;
387  if(this->get_head_position()==bufferlength)
388  {
389  #ifdef BUF_DEBUG
390  #ifdef MONITOR_BUF
391  if(this->get_label()==MONITOR_BUF) {
392  #endif
393  std::cout<<"rank "<<myRank<<": Buffer "<<this->get_label()<<": head_position ("<<this->get_head_position()<<") == bufferlength ("<<bufferlength<<"); setting sync_buffer_full=true."<<std::endl;
394  #ifdef MONITOR_BUF
395  }
396  #endif
397  #endif
398  this->sync_buffer_full = true;
399  }
400  }
401  }
402 
404  template<class T, std::size_t L>
406  {
407  if(not this->is_silenced()) {
408  // #ifdef WITH_MPI
409  // // Prepate to send buffer data to master node
410  // const int masterRank = 0;
411  // if(this->MPI_mode()==true and myRank!=masterRank)
412  // { // MPI-mode worker node instructions
413  // if(not send_buffer_ready)
414  // {
415  // // Make sure previous messages are out of the send buffer before sending new ones.
416  // MPI_Wait(&req_valid, &stat_valid);
417  // MPI_Wait(&req_entries, &stat_entries);
418  // send_buffer_ready = true;
419  // }
420  // /// Compute how many points are to be sent to the master
421  // /// (should be a full buffers worth, except at the end of the
422  // /// run)
423  // std::size_t n_points_to_send = this->get_head_position(); //head should point to the index of the next empty buffer slot (or just past the end of the buffer), which is the number of slots preceding it, which are the ones we want to send. Should be equal to 'bufferlength' most of the time.
424 
425  // /// Copy buffer data into the send buffer
426  // for(uint i=0; i<n_points_to_send; i++)
427  // {
428  // send_buffer_valid[i] = buffer_valid[i];
429  // send_buffer_entries[i] = buffer_entries[i];
430  // }
431 
432  // /// Check that we actually have a set of valid tags
433  // /// If we don't have them yet, throw an error.
434  // /// Should be retrieved after
435  // /// one loop of the master, so if it is a whole buffer
436  // /// length behind then something is probably wrong.
437  // /// If we need to deal with this possibility (very slow
438  // /// loop on master) then some rethinking is needed here.
439  // if(not myTags.valid)
440  // {
441  // std::ostringstream errmsg;
442  // errmsg << "Error! Buffer "<<this->get_label()<<" (sync) is full, but MPI tags have not yet been received from the master process! These should have been sent one loop of the master after the creation of this buffer, and it is now one bufferlength since then, so it seems that the master is stuck relative to this process. This could potentially happen legitimately, but unfortunately the hdf5printer can't handle this corner case just yet.";
443  // printer_error().raise(LOCAL_INFO, errmsg.str());
444  // }
445 
446  // /// Perform non-blocking sends
447  // #ifdef MPI_DEBUG
448  // std::cout<<"rank "<<myRank<<"; buffer '"<<this->get_label()<<"': Isend-ing buffers to master"<<std::endl;
449  // #endif
450  // this->printerComm.Isend(send_buffer_valid, n_points_to_send, masterRank, this->myTags.SYNC_valid, &req_valid);
451  // this->printerComm.Isend(send_buffer_entries, n_points_to_send, masterRank, this->myTags.SYNC_data, &req_entries);
452  // send_buffer_ready = false;
453  // }
454  // else
455  // { // Master node instructions
456  // // (and worker node instructions in non-MPI mode)
457  // write_to_disk();
458  // }
459  // #else
460  write_to_disk();
461  // #endif
462  clear();
463  }
464  }
465 
467  template<class T, std::size_t L>
468  void VertexBufferNumeric1D<T,L>::RA_flush(const std::map<PPIDpair, ulong>& PPID_to_dsetindex)
469  {
470  if(this->is_synchronised())
471  {
472  std::ostringstream errmsg;
473  errmsg << "rank "<<this->myRank<<": Error! Non-synchronised buffer attempted to perform RA_flush! Only non-synchronised buffers are permitted to do this. (buffer name = "<<this->get_label()<<")";
474  printer_error().raise(LOCAL_INFO, errmsg.str());
475  }
476 
477  if(not this->is_silenced()) {
478  // DEPRECATED! All processes now write their output independently, and we combine it at the end.
479  // #ifdef WITH_MPI
480  // // Prepate to send buffer data to master node
481  // const int masterRank = 0;
482  // if(this->MPI_mode()==true and myRank!=masterRank and RA_queue_length!=0)
483  // { // Worker node instructions
484  // if(not RA_send_buffer_ready)
485  // {
486  // // Make sure previous messages are out of the send buffer before sending new ones.
487  // MPI_Wait(&req_RA_write_q, &stat_RA_write_q);
488  // MPI_Wait(&req_RA_write_loc, &stat_RA_write_loc);
489  // MPI_Wait(&req_RA_q_len, &stat_RA_q_len);
490  // MPI_Wait(&req_RA_SENT, &stat_RA_SENT);
491  // RA_send_buffer_ready = true;
492  // }
493  // /// Copy buffer data into the send buffer
494  // send_buffer_RA_q_len = RA_queue_length;
495  // for(uint i=0; i<RA_queue_length; i++)
496  // {
497  // send_buffer_RA_write_q[i] = RA_write_queue[i];
498  // send_buffer_RA_write_loc[i] = RA_write_locations[i];
499  // }
500 
501  // /// Check that we actually have a set of valid tags
502  // /// If we don't have them yet, throw an error.
503  // /// Should be retrieved after
504  // /// one loop of the master, so if it is a whole buffer
505  // /// length behind then something is probably wrong.
506  // /// If we need to deal with this possibility (very slow
507  // /// loop on master) then some rethinking is needed here.
508  // if(not myTags.valid)
509  // {
510  // std::ostringstream errmsg;
511  // errmsg << "Error! Buffer "<<this->get_label()<<" (RA) is full, but MPI tags have not yet been received from the master process! These should have been sent one loop of the master after the creation of this buffer, and it is now one bufferlength since then, so it seems that the master is stuck relative to this process. This could potentially happen legitimately, but unfortunately the hdf5printer can't handle this corner case just yet.";
512  // printer_error().raise(LOCAL_INFO, errmsg.str());
513  // }
514 
515  // /// Perform non-blocking sends
516  // #ifdef MPI_DEBUG
517  // std::cout<<"rank "<<myRank<<"; buffer '"<<this->get_label()<<"': Isend-ing RA buffers to master"<<std::endl;
518  // #endif
519  // this->printerComm.Isend(&send_buffer_RA_write_q, bufferlength, masterRank, this->myTags.RA_queue, &req_RA_write_q);
520  // this->printerComm.Isend(&send_buffer_RA_write_loc, bufferlength, masterRank, this->myTags.RA_loc, &req_RA_write_loc);
521  // this->printerComm.Isend(&send_buffer_RA_q_len, 1, masterRank, this->myTags.RA_length, &req_RA_q_len);
522  // this->printerComm.Isend(&null_message, 1, masterRank, RA_BUFFERS_SENT, &req_RA_SENT);
523  // RA_send_buffer_ready = false;
524  // }
525  // else
526  // { // Master node instructions
527  // // (and worker node instructions in non-MPI mode)
528  // RA_write_to_disk(PPID_to_dsetindex);
529  // }
530  // #else
531  RA_write_to_disk(PPID_to_dsetindex);
532  // #endif
533  RA_queue_length = 0;
534  }
535  }
536 
537 
539  template<class T, std::size_t L>
540  void VertexBufferNumeric1D<T,L>::RA_write(const T& value, const PPIDpair pID, const std::map<PPIDpair, ulong>& PPID_to_dsetindex)
541  {
542  uint i = RA_queue_length;
543  if(i>L)
544  {
545  std::ostringstream errmsg;
546  errmsg << "Error! Attempted to do RA_write beyond end of RA buffer ("<<i<<" > "<<L<<")! (buffer name="<<this->get_label()<<")";
547  printer_error().raise(LOCAL_INFO, errmsg.str());
548  }
549 
550  if(not this->is_silenced()) {
551  RA_write_queue[i] = value;
552  RA_write_locations[i] = pID;
553  RA_queue_length += 1;
554  if(RA_queue_length==L)
555  {
556  RA_flush(PPID_to_dsetindex);
557  }
558  }
559  }
560 
561  // DEPECATED! No longer passing data around via MPI. Each process just writes independently and we combine it at the end.
562  // #ifdef WITH_MPI
563  // // Probe for a sync buffer MPI message from a process
564  // template<class T, std::size_t L>
565  // bool VertexBufferNumeric1D<T,L>::probe_sync_mpi_message(uint source, int* msgsize)
566  // {
567  // this->MPImode_only(LOCAL_INFO); // throws error if MPI_mode()==false
568  // if(not myTags.valid)
569  // {
570  // // Cannot probe for messages until we receive our MPI tags. Ignore them for now
571  // std::cout<<"Attempted to probe for sync MPI messages in buffer "<<this->get_label()<<", but skipping this attempt since MPI tags have not yet been delivered"<<std::endl;
572  // return false;
573  // }
574 
575  // MPI_Status status;
576  // bool is_data_msg = printerComm.Iprobe(source, myTags.SYNC_data, &status);
577  // int msgsize_data = GMPI::Get_count<T>(&status);
578  // bool is_valid_msg = printerComm.Iprobe(source, myTags.SYNC_valid, &status);
579  // int msgsize_valid = GMPI::Get_count<int>(&status);
580  //
581  // if(msgsize_data != msgsize_valid)
582  // {
583  // std::ostringstream errmsg;
584  // errmsg << "Error in buffer "<<this->get_label()<<" during probe_sync_mpi_message! Length of 'data' message ("<<msgsize_data<<") does not match length of 'validity' message ("<<msgsize_valid<<").";
585  // printer_error().raise(LOCAL_INFO, errmsg.str());
586  // }
587  // *msgsize = msgsize_data;
588 
589  // return (is_data_msg or is_valid_msg);
590  // }
591 
592  // // Probe for a random-access buffer MPI message from a process
593  // template<class T, std::size_t L>
594  // bool VertexBufferNumeric1D<T,L>::probe_RA_mpi_message(uint source)
595  // {
596  // this->MPImode_only(LOCAL_INFO); // throws error if MPI_mode()==false
597  // if(not myTags.valid)
598  // {
599  // // Cannot probe for messages until we receive our MPI tags. Ignore them for now
600  // std::cout<<"Attempted to probe for RA MPI messages in buffer "<<this->get_label()<<", but skipping this attempt since MPI tags have not yet been delivered"<<std::endl;
601  // return false;
602  // }
603  // bool is_q_msg = printerComm.Iprobe(source, myTags.RA_queue);
604  // bool is_loc_msg = printerComm.Iprobe(source, myTags.RA_loc);
605  // bool is_len_msg = printerComm.Iprobe(source, myTags.RA_length);
606  // return (is_q_msg or is_loc_msg or is_len_msg);
607  // }
608 
609  // // Retrieve sync buffer data from an MPI message
610  // // Should only be triggered if a valid message is known to exist to be retrieved from the input source!
611  // template<class T, std::size_t LENGTH>
612  // void VertexBufferNumeric1D<T,LENGTH>::get_sync_mpi_message(uint source, int exp_length)
613  // {
614  // this->MPImode_only(LOCAL_INFO); // throws error if MPI_mode()==false
615  // if(exp_length < 0)
616  // {
617  // std::ostringstream errmsg;
618  // errmsg << "Error retrieving sync message in buffer "<<this->get_label()<<"! Invalid expected message length supplied ("<<exp_length<<" < 0)";
619  // printer_error().raise(LOCAL_INFO, errmsg.str());
620  // }
621  // uint uexp_length = exp_length;
622 
623  // if(uexp_length > LENGTH)
624  // {
625  // std::ostringstream errmsg;
626  // errmsg << "Error retrieving sync message in buffer "<<this->get_label()<<"! Expected message length ("<<uexp_length<<") is larger than the allocated buffer size (LENGTH="<<LENGTH<<")";
627  // printer_error().raise(LOCAL_INFO, errmsg.str());
628  // }
629 
630  // // An MPI_Iprobe should have been done prior to calling this function,
631  // // in order to trigger delivery of the message to the correct buffer.
632  // // So now we trust that this buffer is indeed supposed to receive the
633  // // message. We can also use a blocking receive since we know that a
634  // // message is already waiting to be sent.
635 
636  // // Buffers to store received message
637  // int recv_buffer_valid[LENGTH]; // Would like to make this bool, but that requires MPI C++ bindings.
638  // T recv_buffer_entries[LENGTH];
639 
640  // //#ifdef MPI_DEBUG
641  // // Double check that a message is actually waiting to be sent
642  // // There is a code bug if this is not the case
643  // MPI_Status status;
644  // bool message_waiting1 = printerComm.Iprobe(source, myTags.SYNC_valid, &status);
645  // bool message_waiting2 = printerComm.Iprobe(source, myTags.SYNC_data, &status);
646  // if(not message_waiting1 and not message_waiting2)
647  // {
648  // std::ostringstream errmsg;
649  // errmsg << "Error! get_sync_mpi_message called with source="<<source<<", but there is no appropriately tagged message waiting to be delivered from that process! This is a bug, please report it.";
650  // printer_error().raise(LOCAL_INFO, errmsg.str());
651  // }
652  // // Double check that the message has the expected number of elements
653  // // (this must match across all the buffers we are retrieving together)
654  // int msgsize = GMPI::Get_count<T>(&status);
655  // if(msgsize != exp_length)
656  // {
657  // std::ostringstream errmsg;
658  // errmsg << "Error retrieving sync message in buffer "<<this->get_label()<<"! Message length ("<<msgsize<<") does not match expected length ("<<exp_length<<").";
659  // printer_error().raise(LOCAL_INFO, errmsg.str());
660  // }
661  // //#endif
662  //
663  // #ifdef MPI_DEBUG
664  // std::cout<<"rank "<<myRank<<": Collecting sync buffer ("<<this->get_label()<<") from process "<<source<<std::endl;
665  // #endif
666 
667  // printerComm.Recv(&recv_buffer_valid, msgsize, source, myTags.SYNC_valid);
668  // printerComm.Recv(&recv_buffer_entries, msgsize, source, myTags.SYNC_data);
669  //
670  // #ifdef MPI_DEBUG
671  // std::cout<<"rank "<<myRank<<"; buffer '"<<this->get_label()<<"': Received sync buffer from rank "<<source<<" (size="<<msgsize<<"). Appending received data to my sync buffers."<<std::endl;
672  // #endif
673 
674  // // Write the buffers to disk
675  // // write_external_to_disk(recv_buffer_entries,recv_buffer_valid);
676 
677  // // Rather than do external write, I think it is cleaner to just feed
678  // // everything through the normal "append" system.
679 
680  // for(int i=0; i<msgsize; i++)
681  // {
682  // // Push an element of the received data into the buffer
683  // if(recv_buffer_valid[i])
684  // {
685  // append(recv_buffer_entries[i]);
686  // }
687  // else
688  // {
689  // skip_append();
690  // }
691 
692  // // Check if we need to do a write to disk
693  // // Note; the buffer should have been emptied (if needed)
694  // // BEFORE get_sync_mpi_message() was called, so the if the
695  // // first append in this loop fails due to the buffer
696  // // being full then this indicates that that was
697  // // probably not done.
698  // if(sync_buffer_is_full())
699  // {
700  // #ifdef MPI_DEBUG
701  // std::cout<<"rank "<<myRank<<": During get_sync_mpi_message; Buffer "<<this->get_label()<<" full, emptying it..."<<std::endl;
702  // #endif
703  // flush();
704  // }
705  // }
706  // }
707 
708  // // Retrieve RA buffer data from an MPI message
709  // // Should only be triggered if a valid message is known to exist to be retrieved from the input source!
710  // template<class T, std::size_t LENGTH>
711  // void VertexBufferNumeric1D<T,LENGTH>::get_RA_mpi_message(uint source, const std::map<PPIDpair, ulong>& PPID_to_dsetindex)
712  // {
713  // this->MPImode_only(LOCAL_INFO); // throws error if MPI_mode()==false
714  // // An MPI_Iprobe should have been done prior to calling this function,
715  // // in order to trigger delivery of the message to the correct buffer.
716  // // So now we trust that this buffer is indeed supposed to receive the
717  // // message. We can also use a blocking receive since we know that a
718  // // message is already waiting to be sent.
719 
720  // // Buffers to store received messages
721  // T recv_buffer_RA_write_q[LENGTH];
722  // PPIDpair recv_buffer_RA_write_loc[LENGTH];
723  // uint recv_buffer_RA_q_len;
724 
725  // #ifdef MPI_DEBUG
726  // // Double check that a message is actually waiting to be sent
727  // // There is a code bug if this is not the case
728  // MPI_Status status;
729  // bool message_waiting1 = printerComm.Iprobe(source, myTags.RA_queue, &status);
730  // bool message_waiting2 = printerComm.Iprobe(source, myTags.RA_loc, &status);
731  // bool message_waiting3 = printerComm.Iprobe(source, myTags.RA_length, &status);
732  // if(not message_waiting1 and not message_waiting2 and not message_waiting3) {
733  // std::ostringstream errmsg;
734  // errmsg << "Error! get_RA_mpi_message called with source="<<source<<", but there is no appropriately tagged message waiting to be delivered from that process! This is a bug, please report it.";
735  // printer_error().raise(LOCAL_INFO, errmsg.str());
736  // }
737  // #endif
738 
739  // uint null_message;
740  // printerComm.Recv(&recv_buffer_RA_write_q, LENGTH, source, myTags.RA_queue );
741  // printerComm.Recv(&recv_buffer_RA_write_loc, LENGTH, source, myTags.RA_loc );
742  // printerComm.Recv(&recv_buffer_RA_q_len, 1, source, myTags.RA_length);
743  // printerComm.Recv(&null_message, 1, source, RA_BUFFERS_SENT); // absorbs one off the queue if there are several
744 
745  // #ifdef MPI_DEBUG
746  // std::cout<<"rank "<<myRank<<"; buffer '"<<this->get_label()<<"': Received random-access buffer from rank "<<source<<". Sending write commands through my RA buffers."<<std::endl;
747  // #endif
748 
749  // // feed all write commands through the master process RA_write commands
750  // for(uint i=0; i<recv_buffer_RA_q_len; i++)
751  // {
752  // RA_write(recv_buffer_RA_write_q[i], recv_buffer_RA_write_loc[i], PPID_to_dsetindex);
753  // }
754  // }
755 
756  // // Update myTags with valid values
757  // template<class T, std::size_t L>
758  // void VertexBufferNumeric1D<T,L>::update_myTags(uint first_tag)
759  // {
760  // this->MPImode_only(LOCAL_INFO); // throws error if MPI_mode()==false
761  // if(myTags.valid)
762  // {
763  // std::ostringstream errmsg;
764  // errmsg << "Error! Tried to update MPI tags for buffer "<<this->get_label()<<", but the current tags are already valid!";
765  // printer_error().raise(LOCAL_INFO, errmsg.str());
766  // }
767  // myTags = BuffTags(first_tag);
768  // return;
769  // }
770 
771  // #endif
772 
774  template<class T, std::size_t L>
775  T VertexBufferNumeric1D<T,L>::get_entry(const std::size_t i) const
776  {
777  if(this->is_silenced()) {
778  std::string errmsg = "Error! Attempted to retrieve data from a silenced buffer!";
779  printer_error().raise(LOCAL_INFO, errmsg);
780  }
781  if(buffer_valid[i])
782  {
783  return buffer_entries[i];
784  }
785  else
786  {
787  std::string errmsg = "Error! Attempted to retrieve data from an invalidated VertexBufferNumeric1D entry!";
788  printer_error().raise(LOCAL_INFO, errmsg);
789  }
790  }
791 
793  template<class T, std::size_t L>
795  {
796  if(not this->is_silenced()) {
797  #ifdef BUF_DEBUG
798  #ifdef MONITOR_BUF
799  if(this->get_label()==MONITOR_BUF) {
800  #endif
801  std::cout<<"rank "<<myRank<<": Buffer "<<this->get_label()<<": clear()"<<std::endl;
802  #ifdef MONITOR_BUF
803  }
804  #endif
805  #endif
806 
807  for(std::size_t i=0; i<bufferlength; i++)
808  {
809  buffer_valid[i] = false;
810  buffer_entries[i] = 0;
811  }
812  this->reset_head();
813  this->sync_buffer_full = false;
814  this->sync_buffer_empty = true;
815  }
816  }
817 
823  template<class T, std::size_t L>
825  {
826  //std::cout << "rank "<<myRank<<": Pushing forward (new?) buffer '"<<this->get_label()<<"' by "<<N<<" positions"<<std::endl;
827  for(ulong i=0; i<N; i++)
828  {
829  if(this->sync_buffer_is_full()) clear();
830  skip_append();
831  }
832  }
833 
835  }
836 }
837 #endif
greatScanData data
Definition: great.cpp:38
virtual void N_skip_append(ulong N)
Skip several/many positions NOTE! This is meant for initialising new buffers to the correct position...
unsigned long long int pointID
EXPORT_SYMBOLS error & printer_error()
Printer errors.
T get_entry(const std::size_t i) const
Extract (copy) a record.
Declaration of VertexBufferBase class This is the base buffer class used by the HDF5Printer vertex bu...
#define LOCAL_INFO
Definition: local_info.hpp:34
void append(const T &value, const PPIDpair pID=null_PPID)
Append a record to the buffer.
virtual void flush()
Either send sync buffer data to master node via MPI, or trigger the write to disk.
VertexBuffer for simple numerical types.
void RA_write(const T &value, const PPIDpair pID, const std::map< PPIDpair, ulong > &PPID_to_dsetindex)
Queue up a desynchronised ("random access") dataset write to previous scan iteration.
Struct for a collection of MPI tags belonging to a single buffer.
VertexBufferNumeric1D(const std::string &label, const int vID, const unsigned int i, const bool sync, const bool sil, const bool resume, const char access)
unsigned long int ulong
A simple C++ wrapper for the MPI C bindings.
PPIDpair PPID_of_last_append
Variable to check that "append" is not called twice in a row for the same scan point.
static const PPIDpair null_PPID
Special value for the above to use for skipping the double-append check (e.g. when receiving many poi...
VertexBuffer abstract interface base class.
Exception objects required for standalone compilation.
virtual void RA_flush(const std::map< PPIDpair, ulong > &PPID_to_dsetindex)
Either send random-access buffer data to master node via MPI, or trigger the write to disk...
virtual void skip_append()
No data to append this iteration; skip this slot.
static const std::size_t NTAGS
pointID / process number pair Used to identify a single parameter space point
TODO: see if we can use this one:
Definition: Analysis.hpp:33
MPI tag definitions for the VertexBuffer classes.