# This class is used for loading the output of a simulation as a dictionary following the formatting of sim.output or save sim.output to file
# Can be also used to convert between output formats (rumd3 -> gamdpy supported so far)
import sys
import os
import time
import h5py
import numpy as np
# This class is a wrapper for several possible inputs/output
[docs]
class TrajectoryIO():
"""
This class handles the loading and saving of simulation data.
When the class can be instantiated with an output from a previous simulation.
The data can be saved in self.h5 in the same format of the sim.output object.
When the class is instantiated without an input, self.h5 is None and can be assigned afterward (used to save output from memory simulation)
Parameters
----------
name : str
Name of the file or folder to read output from. Can be a gamdpy .h5 output or a rumd3 TrajectoryFiles folder.
Examples
--------
>>> import gamdpy as gp
>>> import h5py
>>> output = gp.tools.TrajectoryIO("examples/Data/LJ_r0.973_T0.70_toread.h5").get_h5()
Found .h5 file (examples/Data/LJ_r0.973_T0.70_toread.h5), loading to gamdpy as output dictionary
>>> nblocks, nconfs, N, D = output['trajectory/positions'].shape
>>> print(f"Output file examples/Data/LJ_r0.973_T0.70.h5 contains a simulation of {N} particles in {D} dimensions")
Output file examples/Data/LJ_r0.973_T0.70.h5 contains a simulation of 2048 particles in 3 dimensions
>>> print(f"The simulation output is divided into {nblocks} blocks, each of them with {nconfs} configurations")
The simulation output is divided into 32 blocks, each of them with 12 configurations
>>> output = gp.tools.TrajectoryIO("examples/Data/NVT_N4000_T2.0_rho1.2_KABLJ_rumd3/TrajectoryFiles").get_h5() # Read from rumd3
Found rumd3 TrajectoryFiles, loading to rumpdy as output dictionary
>>> nblocks, nconfs, N, D = output['trajectory/positions'].shape
>>> print(f"File examples/Data/NVT_N4000_T2.0_rho1.2_KABLJ_rumd3/TrajectoryFiles contains a simulation of {N} particles in {D} dimensions")
File examples/Data/NVT_N4000_T2.0_rho1.2_KABLJ_rumd3/TrajectoryFiles contains a simulation of 4000 particles in 3 dimensions
>>> print(f"The simulation output is divided into {nblocks} blocks, each of them with {nconfs} configurations")
The simulation output is divided into 2 blocks, each of them with 24 configurations
"""
def __init__(self, name=""):
import importlib.util # This python library can be used if module can be imported
## Following lines defines the initialization behavior depending on given input
if name[-3:]==".h5":
modification_time = os.path.getmtime(name)
readable_time = time.ctime(modification_time)
#print(f"Found .h5 file ({name}, {readable_time}), loading to gamdpy as output dictionary")
print(f"Found .h5 file ({name}), loading to gamdpy as output dictionary") # Cant handle timestamp in doctest
self.h5 = self.load_h5(name)
elif "TrajectoryFiles" in name:
try:
assert os.path.isdir(name)==True
print("Found rumd3 TrajectoryFiles, loading to rumpdy as output dictionary")
self.h5 = self.load_rumd3(name)
except:
raise Exception(f"Folder {name} doesn't exists")
elif name=="":
print("Warning: class initialized without input data, set self.h5 manually")
self.h5 = None
else:
print("Input not recognized, unsupported format")
self.h5 = None
## Following lines deals with the compression of the output
# The class is initialized with some default compression settings depending on what is available.
# These parameters can be changed by the user by changing the class attributes
# Following might be relevant when .save_h5 would modify compression when writing to disk
# Checks if hdf5plugin lib is available
# if importlib.util.find_spec("hdf5plugin")==None:
# self.compression_type = "gzip"
# self.compression_opts = 6 # seems the best option in terms of compression and timing
# else:
# import hdf5plugin
# if hdf5plugin is available use BZip2 compression (slightly better and faster)
# self.compression_type = hdf5plugin.BZip2()
[docs]
def get_h5(self) -> h5py.File:
""" Returns self.h5 """
return self.h5
[docs]
def load_h5(self, name:str) -> h5py.File:
""" Makes self.h5 a view of the .h5 files """
return h5py.File(name, "r")
# Load from TrajectoryFiles (std rumd3 output)
# It assumes trajectories are spaced in log2
[docs]
def load_rumd3(self, name:str) -> h5py.File:
""" Reads a rumd3 TrajectoryFiles folder and convert it into gamdpy .h5 output. Assumes RectangularSimulationBox, corresponding to gamdpy's Orthorhombic.
This function returns a memory .h5"""
import os, gzip, glob
import pandas as pd
# Rumd3 output is always in D=3
dim = 3
# Checks if energy output is present
if "LastComplete_energies.txt" not in os.listdir(name):
print("LastComplete_energies.txt not present")
energy = False
else:
energy = True
# Checks if trajectory output is present
if "LastComplete_trajectory.txt" not in os.listdir(name):
print("LastComplete_trajectory.txt not present")
traj = False
else:
traj = True
# Exit if no output is found in TrajectoryFiles folder
if not energy and not traj:
print(f"The folder {name} has no LastComplete_*.txt files, returning None")
return None
# Reads block information from LastComplete_*
if energy: nblocks, blocksize = np.loadtxt(f"{name}/LastComplete_energies.txt", dtype=np.int32)
else : nblocks, blocksize = np.loadtxt(f"{name}/LastComplete_trajectory.txt", dtype=np.int32)
# Defining output memory .h5 file
fullpath = f"{name}"
output = h5py.File(f"{id(fullpath)}.h5", "w", driver='core', backing_store=False)
assert isinstance(output, h5py.File), "Error creating memory h5 file in TrajectoryIO.load_rumd3"
# Read trajectories
if traj:
traj_files = sorted(glob.glob(f"{name}/trajectory*"))
# Remove last block if incomplete
if traj_files[-1]==f"{name}/trajectory{nblocks+1:04d}.xyz.gz": traj_files = traj_files[:-1]
# Read metadata from first file in the list
with gzip.open(f"{traj_files[0]}", "r") as f:
npart = int(f.readline())
cmt_line = f.readline().decode().split()
meta_data = dict()
for item in cmt_line:
key, val = item.split("=")
meta_data[key] = val
ntrajinblock = int(meta_data['logLin'].split(",")[3])
num_types = int(meta_data['numTypes'])
masses = [float(x) for x in meta_data['mass'].split(',')]
assert len(masses) == num_types
if meta_data['ioformat'] == '1': # old rumd format
lengths = np.array([float(x) for x in meta_data['boxLengths'].split(',')], dtype=np.float32)
else:
assert meta_data['ioformat'] == '2'
sim_box_data = meta_data['sim_box'].split(',')
sim_box_type = sim_box_data[0]
sim_box_params = [float(x) for x in sim_box_data[1:]]
assert sim_box_type == 'RectangularSimulationBox'
lengths = np.array(sim_box_params, dtype=np.float32)
integrator_data = meta_data['integrator'].split(',')
timestep = integrator_data[1]
# Loop over the files and read them assuming each line is type, x, y, z, imx, imy, imz
toskip1 = np.array([ (npart+2)*x for x in range(1+ntrajinblock)])
toskip2 = np.array([1+(npart+2)*x for x in range(1+ntrajinblock)])
toskip = sorted(list(np.concatenate((toskip1, toskip2))))
positions = list()
images = list()
for trajectory in traj_files:
tmp_data = pd.read_csv(trajectory, skiprows = toskip, usecols=[0,1,2,3,4,5,6], names=["type", "x", "y", "z", "imx", "imy", "imz"], delimiter=" ")
type_array = tmp_data['type'].to_numpy()
pos_array = np.c_[tmp_data['x'].to_numpy(), tmp_data['y'].to_numpy(), tmp_data['z'].to_numpy()]
img_array = np.c_[tmp_data['imx'].to_numpy(), tmp_data['imy'].to_numpy(), tmp_data['imz'].to_numpy()]
positions.append(pos_array.reshape((-1,npart,dim)))
images.append(img_array.reshape((-1,npart,dim)))
# Saving data in output h5py
output.attrs['dt'] = timestep
output.attrs['simbox_initial'] = lengths
output.create_group('trajectory')
output['trajectory'].create_dataset('positions', shape=(len(traj_files), 1+ntrajinblock, npart, dim), dtype=np.float32)
output['trajectory/positions'][:,:,:,:] = np.array(positions)
output['trajectory'].create_dataset('images', shape=(len(traj_files), 1+ntrajinblock, npart, dim), dtype=np.int32)
output['trajectory/images'][:,:,:,:] = np.array(images)
#output.create_dataset("ptype", data=type_array[:npart], shape=(npart), dtype=np.int32)
output.create_group('initial_configuration')
output['initial_configuration'].create_dataset("ptype", data=type_array[:npart], shape=(npart), dtype=np.int32)
# Read energies
if energy:
energy_files = sorted(glob.glob(f"{name}/energies*"))
# Remove last block if incomplete
if energy_files[-1]==f"{name}/energies{nblocks+1:04d}.dat.gz": energy_files = energy_files[:-1]
# Read metadata from first file in the list
with gzip.open(f"{energy_files[0]}", "r") as f:
# ioformat=2 N=4096 Dt=147.266830 columns=ke,pe,p,T,Etot,W (example of lin saving)
# ioformat=2 N=4096 timeStepIndex=0 logLin=0,2,0,17,0 columns=ke,pe,p,T,Etot,W (example of log saving)
cmt_line = f.readline().decode().split()[1:]
meta_data = dict()
for item in cmt_line:
key, val = item.split("=")
meta_data[key] = val
npart = meta_data['N']
if 'Dt' in meta_data: save_interval = meta_data['Dt']
elif 'logLin' in meta_data: save_interval = f'log{meta_data["logLin"][1]}'
col_names = meta_data['columns'].split(",")
all_energies = list()
for energies in energy_files:
tmp_data = pd.read_csv(energies, skiprows=1, names=col_names, usecols = [i for i in range(len(col_names))], delimiter=" ")
all_energies.append(tmp_data.to_numpy())
# Saving data in output h5py
grp = output.create_group('scalar_saver')
if 'dt' in output.attrs.keys() and 'Dt' in meta_data:
output['scalar_saver'].attrs['steps_between_output'] = float(save_interval)/float(output.attrs['dt'])
output['scalar_saver'].attrs['time_between_output'] = save_interval
output['scalar_saver'].attrs['scalar_names'] = list(col_names)
output.create_dataset('scalar_saver/scalars', data=np.vstack(all_energies))
return output
[docs]
def save_h5(self, name:str, mode="w"):
"""
This method saves self.h5 to disk.
It can be used to save sim.output to file if TrajectoryIO class is initialized without arguments and then using self.h5 = sim.output.
LC: By default each runtime action has a compression setting. Would be nice if this function can change that when saving.
"""
import h5py
#import importlib.util # These lines will be relevant when .save_h5 can modify compression of saved output
#if importlib.util.find_spec("hdf5plugin")!=None:
# import hdf5plugin
with h5py.File(name, mode=mode) as fout:
for key in self.h5.keys():
self.h5.copy(source=self.h5[key], dest=fout, name=key)
return