gambit is hosted by Hepforge, IPPP Durham
GAMBIT  v1.5.0-252-gf9a3f78
a Global And Modular Bsm Inference Tool
combine_hdf5.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 """Combine data from several hdf5 files created by HDF5Printer into a single file"""
4 import signal
5 def sigint_handler(signum, frame):
6  print 'CTRL+C is blocked while the HDF5Printer combine script runs! Signal received, but ignored.'
7 signal.signal(signal.SIGINT, sigint_handler)
8 
9 from hdf5tools import *
10 import math
11 import numpy as np
12 import h5py
13 import os
14 import sys
15 
16 chunksize = 1000
17 
18 bufferlength = 100 # Must match setting in hdf5printer.hpp
19 max_ppidpairs = 10*bufferlength # " "
20 
21 def usage():
22  print " Usage: python combine_hdf5.py <path-to-target-hdf5-file> <root group in hdf5 files> <tmp file 1> <tmp file 2> ..."
23  print
24  print " Attempts to combine the data in a group of hdf5 files produced by HDF5Printer in separate processes during a GAMBIT run."
25  print " Use --delete_tmp flag to delete input files upon successful combination."
26  print " Use --runchecks flag to run some extra validity checks on the input and output data (warning: may be slow for large datasets)"
27  exit(1)
28 
29 #====Begin "main"=================================
30 
31 #if len(sys.argv)!=6 and len(sys.argv)!=7: usage()
32 #
33 runchecks=False
34 delete_tmp=False
35 if len(sys.argv)<3:
36  usage()
37 
38 # I dont think this works right...
39 if "--runchecks" in sys.argv:
40  runchecks=True
41  sys.argv.remove("--runchecks")
42 
43 if "--delete_tmp" in sys.argv:
44  delete_tmp=True
45  sys.argv.remove("--delete_tmp")
46 
47 outfname = sys.argv[1]
48 group = sys.argv[2]
49 tmp_files = sys.argv[3:]
50 N = len(tmp_files)
51 RA_group = group + "/RA"
52 
53 print "Working directory: ", (os.getcwd())
54 print "Target combined filename:", outfname
55 print "Root group: ", group
56 print "Number of files to combine: ", N
57 print "Files to combine: ", tmp_files
58 print ""
59 print "Analysing input files..."
60 
61 files = {}
62 
63 sync_dsets = [set([]) for i in range(N)]
64 RA_dsets = [set([]) for i in range(N)]
65 RA_dsets_exclude = set(["RA_pointID","RA_pointID_isvalid","RA_MPIrank","RA_MPIrank_isvalid"])
66 sync_lengths = [0 for i in range(N)]
67 RA_lengths = [0 for i in range(N)]
68 fnames = tmp_files
69 
70 for i,fname in enumerate(fnames):
71  print " Opening: {0}".format(fname)
72  f = h5py.File(fname,'r')
73  files[fname] = f
74  if runchecks:
75  print "Checking temporary file for duplicates..."
76  check_for_duplicates(f,group)
77  print " Analysing..."
78  datasets = []
79  tmp_dset_metadata = {}
80  tmp_RA_dset_metadata = {}
81  # First get total lengths of the sync datasets
82  if group in f:
83  get_dset_lengths(tmp_dset_metadata,f[group],sync_dsets[i])
84  sync_lengths[i] = check_lengths(tmp_dset_metadata)
85  else:
86  raise ValueError("File '{0}' does not contain the group '{1}!'".format(fname,group))
87  # Next get total lengths of the RA datasets
88  if RA_group in f:
89  get_dset_lengths(tmp_RA_dset_metadata,f[RA_group],RA_dsets[i])
90  RA_lengths[i] = check_lengths(tmp_RA_dset_metadata)
91  else:
92  RA_lengths[i] = 0
93  # No RA data in this file, but that's ok, it happens sometimes.
94  #raise ValueError("File '{0}' does not contain the group '{1}!'".format(fname,RA_group))
95  print " ...done"
96 
97 print "sync_lengths: ", sync_lengths
98 total_sync_length = sum(sync_lengths)
99 
100 # Make sure all sync dsets have the same length
101 
102 print "Sync dsets:"
103 for i,fname in enumerate(fnames):
104  print " In {0}".format(fname)
105  if len(sync_dsets[i])==0:
106  print " - None"
107  else:
108  for item in sorted(sync_dsets[i]):
109  print " - ", item
110  print " sync_length = {0}".format(sync_lengths[i])
111 
112 print "RA dsets:"
113 for i,fname in enumerate(fnames):
114  print " In {0}".format(fname)
115  if len(RA_dsets[i])==0:
116  print " - None"
117  else:
118  for item in sorted(RA_dsets[i]):
119  print " - ", item
120  print " RA_length = {0}".format(RA_lengths[i])
121 
122 print "Combined sync length = ", total_sync_length
123 
124 print "Opening file for adding combined data:"
125 print " {0}".format(outfname)
126 fout = h5py.File(outfname,'a')
127 
128 if not group in fout:
129  gout = fout.create_group(group)
130 else:
131  if runchecks:
132  print "Checking existing combined output for duplicates..."
133  check_for_duplicates(fout,group)
134  gout = fout[group]
135 
136 # Check for existing dsets in the output (and get their lengths if they exist)
137 existing_dsets = {}
138 dsetlengths = {}
139 for name in gout:
140  if isinstance(gout[name],h5py.Dataset):
141  print " Accessing existing dset:", name
142  existing_dsets[name] = gout[name]
143  dsetlengths[name] = gout[name].shape[0]
144 
145 # check that the existing dsets have a consistent length
146 if len(dsetlengths)!=0:
147  init_output_length = check_lengths(dsetlengths)
148  print "Existing datasets have length ", init_output_length
149 else:
150  init_output_length = 0
151 
152 
153 # Resize the existing dsets to accommodate the new data
154 for dsetname,dset in existing_dsets.items():
155  print " Extending existing dset '{0}' to length {1}+{2}={3} to accommodate new data...".format(dsetname,init_output_length,total_sync_length,init_output_length+total_sync_length)
156  dset.resize((init_output_length+total_sync_length,))
157 
158 # Create dataset to match every sync and RA dataset
159 target_dsets = {}
160 all_sync_dsets = set([]).union(*sync_dsets)
161 all_RA_dsets = set([]).union(*RA_dsets)
162 for dsetname,dt in sorted(all_sync_dsets):
163  if not dsetname in gout:
164  print " Creating empty dset:", dsetname
165  target_dsets[dsetname] = gout.create_dataset(dsetname, (init_output_length+total_sync_length,), chunks=(chunksize,), dtype=dt, maxshape=(None,))
166  else:
167  target_dsets[dsetname] = gout[dsetname]
168 for dsetname,dt in sorted(all_RA_dsets):
169  if not dsetname in RA_dsets_exclude:
170  if not dsetname in gout:
171  print " Creating empty dset:", dsetname
172  target_dsets[dsetname] = gout.create_dataset(dsetname, (init_output_length+total_sync_length,), chunks=(chunksize,), dtype=dt, maxshape=(None,))
173  else:
174  target_dsets[dsetname] = gout[dsetname]
175 
176 # Copy data from separate sync datasets into combined datasets
177 nextempty=init_output_length
178 for fname in fnames:
179  print "Copying sync data from file:"
180  print " {0}".format(fname)
181  print "to file:"
182  print " {0}".format(outfname)
183  fin = files[fname]
184 
185  if runchecks:
186  print "Checking {0}[{1}] for duplicate entries".format(fname,group)
187  check_for_duplicates(fin,group)
188  print "No duplicates, proceeding with copy"
189 
190  dset_length=None
191  for itemname in fin[group]:
192  item = fin[group][itemname]
193  if isinstance(item,h5py.Dataset):
194  #print itemname,"is a Dataset"
195  if dset_length==None:
196  dset_length=item.shape[0]
197  #Do the copy
198  copy_dset_whole(item,gout[itemname],nextempty)
199  if(dset_length==None):
200  print "No sync dsets found! Nothing copied!"
201  else:
202  if runchecks:
203  print "Re-checking combined file for duplicates..."
204  check_for_duplicates(fout,group)
205  nextempty+=dset_length
206 
207 # Copy data from RA datasets into combined dataset
208 for fname in fnames:
209  fin = files[fname]
210  if RA_group in fin and "RA_MPIrank" in fin[RA_group]:
211  print "RA data detected in file:"
212  print " {0}".format(fname)
213  print "Copying it to file:"
214  print " {0}".format(outfname)
215 
216  # Get write targets
217  dset_length=fin[RA_group]["RA_MPIrank"].shape[0]
218 
219  # Do this in chunks matching the max_ppidpairs value in hdf5printer.hpp
220  nchunks = np.ceil(dset_length / (1.*max_ppidpairs))
221  for i in range(int(nchunks)):
222  imin = i*max_ppidpairs
223  imax = np.min([(i+1)*max_ppidpairs, dset_length])
224 
225  pointIDs_in = fin[RA_group]["RA_pointID"][imin:imax]
226  mpiranks_in = fin[RA_group]["RA_MPIrank"][imin:imax]
227  pointIDs_isvalid_in = np.array(fin[RA_group]["RA_pointID_isvalid"][imin:imax],dtype=np.bool)
228  mpiranks_isvalid_in = np.array(fin[RA_group]["RA_MPIrank_isvalid"][imin:imax],dtype=np.bool)
229 
230  mask_in = (pointIDs_isvalid_in & mpiranks_isvalid_in)
231  print "...{0} scheduled RA writes found.".format(np.sum(mask_in))
232 
233  # convert entries to single values to facilitate fast comparison
234  IDs_in = cantor_pairing(pointIDs_in[mask_in],mpiranks_in[mask_in])
235 
236  #if runchecks:
237  # print " checking input selection for duplicate keys..."
238  # ids = IDs_in
239  # pid = pointIDs_in[mask_in]
240  # rank = mpiranks_in[mask_in]
241  # for ID,p,r in zip(ids,pid,rank):
242  # Nmatches = np.sum(ID==ids)
243  # if Nmatches>1:
244  # print " Warning!", ID, "is duplicated {0} times!".format(Nmatches)
245  # pMatch = np.sum(p==pid)
246  # rMatch = np.sum(r==rank)
247  # if pMatch>1 or rMatch>1:
248  # print " ...pointID duplicate count: ", pMatch
249  # print " ...MPIrank duplicate count: ", rMatch
250 
251  # Find them in the target output file
252  # TODO: this part could still cause memory issues if the output datasets are too big
253  pointIDs_out = fout[group]["pointID"]
254  mpiranks_out = fout[group]["MPIrank"]
255  pointIDs_isvalid_out = np.array(fout[group]["pointID_isvalid"][:],dtype=np.bool)
256  mpiranks_isvalid_out = np.array(fout[group]["MPIrank_isvalid"][:],dtype=np.bool)
257 
258  mask_out = (pointIDs_isvalid_out & mpiranks_isvalid_out)
259  print "...{0} possible RA targets found.".format(np.sum(mask_out))
260 
261  # convert entries to single values to facilitate fast comparison
262  IDs_out = cantor_pairing(
263  np.array(pointIDs_out[mask_out],dtype=np.longlong),
264  np.array(mpiranks_out[mask_out],dtype=np.longlong)
265  )
266  # check that the pairing has not overflowed
267  if np.any(IDs_out<0):
268  raise ValueError("Error while computing cantor pairing for RA to SYNC matching! Integer overflow detected, so matching will fail! Please increase the size of the integer type used!")
269 
270  if runchecks:
271  print " checking output selection for duplicate keys..."
272  ids = IDs_out
273  pid = pointIDs_out[mask_out]
274  rank = mpiranks_out[mask_out]
275  for ID,p,r in zip(ids,pid,rank):
276  Nmatches = np.sum(ID==ids)
277  if Nmatches>1:
278  print " Warning!", ID, "is duplicated {0} times!".format(Nmatches)
279  Match = np.sum((p==pid) & (r==rank))
280  if Match>1:
281  print " ...MPIrank/pointID duplicate count: ", Match
282 
283  # Find which IDs in the output dataset are write targets
284  target_mask_small = np.in1d(IDs_out,IDs_in)
285 
286  # Cast the mask back so that it works on the original dataset
287  # (TODO: is there a more efficient way to do this? "target_mask[mask_out][target_mask_small] = True" does not work.)
288  target_length = fout[group]["pointID"].shape[0]
289  alltargetindices = np.arange(target_length)
290  maskindices = alltargetindices[mask_out][target_mask_small]
291 
292  target_mask = np.zeros(target_length, dtype=bool)
293  target_mask[maskindices] = True
294 
295  if runchecks:
296  print " Double-checking that all selected input dset entries have matching targets in the output dsets..."
297  for ID,pid,rank in zip(IDs_in,pointIDs_in[mask_in],mpiranks_in[mask_in]):
298  if ID not in IDs_out: #[target_mask_small]:
299  print " Warning! No target for ID {0} found in output selection! ({1},{2})".format(ID,pid,rank)
300  indexid = np.where( (np.array(IDs_out)==ID) )
301  index = np.where( (np.array(pointIDs_out[mask_out])==pid) &
302  (np.array(mpiranks_out[mask_out])==rank) )
303  print "index of match by ID = ", indexid
304  print "index of match by pid,rank = ", index
305  print "pid,rank =",pid,rank
306  print "pointID? ", pointIDs_out[mask_out][index]
307  print "valid? ", pointIDs_isvalid_out[mask_out][index]
308  print "rank? ", mpiranks_out[mask_out][index]
309  print "valid? ", mpiranks_isvalid_out[mask_out][index]
310  print "cantor =", cantor_pairing(pid,rank)
311  print "cantor==ID? ", cantor_pairing(pid,rank)==ID
312 
313 
314  # Number of "true" elements in target mask should match number of elements in 'in' arrays (after masking)
315  # Check this
316  ntargets = np.sum(target_mask)
317  nsources = np.sum(mask_in)
318  if ntargets!=nsources:
319  raise ValueError("Error while computing targets for RA writes! Number of target matches for writes in the output dataset ({0}) does not match the number of elements scheduled for copying {1}!".format(ntargets,nsources))
320 
321 
366 
367  print " Sorting input data to match order in output selection..."
368  xsort = np.argsort(IDs_in)
369  yindex = np.searchsorted(IDs_in[xsort], IDs_out[target_mask_small])
370  fancyindices = xsort[yindex]
371 
372  if runchecks:
373  print " Checking that matching of input data to output selection has succeeded..."
374  print yindex[:10]
375  for k,(i,j) in enumerate(zip(IDs_out[target_mask_small][:100],IDs_in[fancyindices[:100]])):
376  if i!=j:
377  print " Warning! Mismatch found. At position {0}, input ID ({1}) != output ID ({2})".format(k,i,j)
378  #print i,j
379  #print IDs_out[k-1:k+2]
380  #print IDs_in[fancyindices[k-1:k+2]]
381  #print "pids and ranks:"
382  #print pointIDs_in[mask_in][fancyindices[k-1:k+2]]
383  #print mpiranks_in[mask_in][fancyindices[k-1:k+2]]
384 
385  # Copy the data from the source to the target, after first
386  # rearraging the source according to the index array we just created
387  for itemname in fin[RA_group]:
388  item = fin[RA_group][itemname]
389  if isinstance(item,h5py.Dataset) and not itemname in RA_dsets_exclude:
390  #Do the copy
391  indset = item[imin:imax]
392  #(TODO: figure out how to chunk this...)
393  outdset = fout[group][itemname]
394  #Check that types match
395  if indset.dtype!=outdset.dtype:
396  raise ValueError("Type mismatch detected between dataset {0} in file {1} (dtype={2}), and matching dataset in output file {3} (dtype={3})".format(itemname,fname,indset.dtype,outfname,outdset.dtype))
397  #can't do the fancy list-indexing directly on the hdf5 dataset (the boolean assignment should be ok though)
398  print " Copying RA data for dataset", itemname
399  outdset[target_mask] = indset[mask_in][fancyindices]
400 
401 # DEBUGGING
402 # Check for duplicates in combined output
403 if runchecks:
404  print "Checking final combined output for duplicates..."
405  check_for_duplicates(fout,group)
406 
407 # If everything has been successful, delete the temporary files
408 if delete_tmp:
409  print "Deleting temporary HDF5 files..."
410  for fname in fnames:
411  print " {0}".format(fname)
412  os.remove(fname)
413 
414 print "Data combination completed"
def sigint_handler(signum, frame)
Definition: combine_hdf5.py:5
def check_for_duplicates(fout, group)
Definition: hdf5tools.py:71
def copy_dset_whole(indset, outdset, nextempty, basemsg="")
Definition: hdf5tools.py:59
auto zip(const T &... containers) -> boost::iterator_range< boost::zip_iterator< decltype(boost::make_tuple(std::begin(containers)...))>>
Use for combine container in a range loop: for (auto &&x : zip(a, b)){...}.
def get_dset_lengths(d, group, dsets)
Definition: hdf5tools.py:14
def check_lengths(d)
Definition: hdf5tools.py:28
DS5_MSPCTM DS_INTDOF int
def cantor_pairing(x, y)
Definition: hdf5tools.py:67