import numpy as np
import numba
import json
import h5py
from numba import cuda, config
from .runtime_action import RuntimeAction
from .time_scheduler import Log2
[docs]
class TrajectorySaver(RuntimeAction):
"""
Runtime action for saving configurations during timeblock.
Parameters
----------
scheduler : ~gamdpy.Scheduler
The scheduler defining when to save
include_simbox : bool, optional
Boolean deciding if to include simbox informations in output
Default False
compression : str, optional
String selecting the type of compression used.
Default "gzip"
compression_opts : int, optional
Relevant if "gzip" compression is used. Select compression option for gzip
saving_list : list, optional
This list is used to select which information to save in the trajectory.
Default ['positions', 'images'].
Can be used to save only velocities by setting saving_list = ['velocities'].
Possible list entries are 'positions', 'images', 'velocities', 'forces'.
verbose : bool, optional
Default False
Raises
------
ValueError
If saving_list contains a name which is not recongnized.
"""
def __init__(self, scheduler=Log2(), include_simbox=False,
compression="gzip", compression_opts=4,
saving_list=['positions', 'images'],
update_ptype = False,
update_topology = False,
verbose=False) -> None:
self.scheduler = scheduler
self.include_simbox = include_simbox
self.num_vectors = len(saving_list)
self.translator = {'positions' : 'r', 'images' : 'img', 'velocities' : 'v', 'forces' : 'f'}
self.datatypes = {'positions' : np.float32, 'images': np.int32, 'velocities' : np.float32, 'forces' : np.float32}
self.compression = compression
self.sid = {} # LC : this should be a list
# check that items of saving_list are known
for item in saving_list:
if item not in list(self.datatypes.keys()):
raise KeyError(f"{item} is not recognized. Accepted values are 'positions', 'images', 'velocities', 'forces'.")
self.sid[self.translator[item]]=True
self.saving_list = saving_list
self.update_ptype = update_ptype
self.update_topology = update_topology
if self.compression == 'gzip':
self.compression_opts = compression_opts
else:
self.compression_opts = None
#self.sid = {"r":0, "r_im":1}
def extract_positions(h5file):
return h5file['trajectory/positions']
def extract_images(h5file):
return h5file['trajectory/images']
def extract_velocities(h5file):
return h5file['trajectory/velocities']
def extract_forces(h5file):
return h5file['trajectory/forces']
def setup(self, simulation, num_timeblocks: int, steps_per_timeblock: int, output, verbose=False) -> None:
self.simulation = simulation
self.configuration = self.simulation.configuration
if type(num_timeblocks) != int or num_timeblocks < 0:
raise ValueError(f'num_timeblocks ({num_timeblocks}) should be non-negative integer.')
self.num_timeblocks = num_timeblocks
if type(steps_per_timeblock) != int or steps_per_timeblock < 0:
raise ValueError(f'steps_per_timeblock ({steps_per_timeblock}) should be non-negative integer.')
self.steps_per_timeblock = steps_per_timeblock
# pass the number of steps to the scheduler
# without this line the scheduler does nothing at all
self.scheduler.setup(stepmax=self.steps_per_timeblock, ntimeblocks=self.num_timeblocks)
# both steps '0' and the last one are already counted by the scheduler
self.conf_per_block = self.scheduler.nsaves# + 1
#self.conf_per_block = int(math.log2(self.steps_per_timeblock)) + 2 # Should be user controllable
# Setup output
if verbose:
print(f'Storing results in memory. Expected footprint {self.num_timeblocks * self.conf_per_block * self.num_vectors * self.configuration.N * self.configuration.D * 4 / 1024 / 1024:.2f} MB.')
if 'trajectory' in output.keys():
del output['trajectory']
output.create_group('trajectory')
output.create_group('trajectory/topologies')
# Compression has a different syntax depending if is gzip or not because gzip can have also a compression_opts
# it is possible to use compression=None for not compressing the data
#output.create_dataset('trajectory_saver/positions',
# shape=(self.num_timeblocks, self.conf_per_block, self.configuration.N, self.configuration.D),
# chunks=(1, 1, self.configuration.N, self.configuration.D),
# dtype=np.float32, compression=self.compression, compression_opts=self.compression_opts)
#output.create_dataset('trajectory_saver/images',
# shape=(self.num_timeblocks, self.conf_per_block, self.configuration.N, self.configuration.D),
# chunks=(1, 1, self.configuration.N, self.configuration.D),
# dtype=np.int32, compression=self.compression, compression_opts=self.compression_opts)
for key in self.saving_list:
output.create_dataset(f'trajectory/{key}',
shape=(self.num_timeblocks, self.conf_per_block, self.configuration.N, self.configuration.D),
chunks=(1, 1, self.configuration.N, self.configuration.D),
dtype=self.datatypes[f'{key}'], compression=self.compression, compression_opts=self.compression_opts)
# ptype is a Virtual dataset: https://docs.h5py.org/en/stable/vds.html
if self.update_ptype == False:
layout = h5py.VirtualLayout(shape=(self.num_timeblocks, self.conf_per_block, self.configuration.N), dtype=np.int32)
for block in range(self.num_timeblocks):
for conf in range(self.conf_per_block):
layout[block, conf] = h5py.VirtualSource(output['initial_configuration/ptype'])
output.create_virtual_dataset('trajectory/ptypes', layout, fillvalue=0)
else:
# LC: Need to implement how to save them per step (similar to what done with positions etc)
print(f"update_ptype = True is not implemented")
exit()
if self.update_topology == False:
for block in range(self.num_timeblocks):
# LC: names should be adjusted
output[f'trajectory/topologies/block{block:04d}'] = h5py.SoftLink('/initial_configuration/topology')
else:
# LC: Need to implement how to save them per step
print(f"update_topology = True is not implemented")
exit()
output['trajectory'].attrs['compression_info'] = f"{self.compression} with opts {self.compression_opts}"
output['trajectory'].attrs['num_timeblocks'] = self.num_timeblocks
output['trajectory'].attrs['steps_per_timeblock'] = self.steps_per_timeblock
output['trajectory'].attrs['trajectory_columns'] = list(self.sid.keys())
output['trajectory'].attrs['update_ptype'] = self.update_ptype
output['trajectory'].attrs['update_topology'] = self.update_topology
# Scheduler info
self.scheduler.info_to_h5(output['trajectory'])
#output.attrs['vectors_names'] = list(self.sid.keys())
if self.include_simbox:
if 'sim_box' in output['trajectory'].keys():
del output['trajectory/sim_box']
output.create_dataset('trajectory/sim_box',
shape=(self.num_timeblocks, self.conf_per_block, self.configuration.simbox.len_sim_box_data), dtype=np.float32)
flag = config.CUDA_LOW_OCCUPANCY_WARNINGS
config.CUDA_LOW_OCCUPANCY_WARNINGS = False
self.zero_kernel = self.make_zero_kernel()
config.CUDA_LOW_OCCUPANCY_WARNINGS = flag
def get_params(self, configuration, compute_plan):
self.conf_array = np.zeros((self.conf_per_block, self.num_vectors, self.configuration.N, self.configuration.D),
dtype=np.float32)
self.d_conf_array = cuda.to_device(self.conf_array)
if self.include_simbox:
self.sim_box_output_array = np.zeros((self.conf_per_block, self.configuration.simbox.len_sim_box_data), dtype=np.float32)
self.d_sim_box_output_array = cuda.to_device(self.sim_box_output_array)
return (self.d_conf_array, self.d_sim_box_output_array)
else:
return (self.d_conf_array,)
def make_zero_kernel(self):
# Unpack parameters from configuration and compute_plan
D, num_part = self.configuration.D, self.configuration.N
pb = 32
num_blocks = (num_part - 1) // pb + 1
def zero_kernel(array):
Nx, Ny, Nz, Nw = array.shape
global_id = cuda.grid(1)
if global_id < Nz: # particles
for i in range(Nx):
for j in range(Ny):
for k in range(Nw):
array[i, j, global_id, k] = numba.float32(0.0)
zero_kernel = cuda.jit(zero_kernel)
return zero_kernel[num_blocks, pb]
def update_at_end_of_timeblock(self, timeblock: int, output_reference):
data = self.d_conf_array.copy_to_host()
# note that d_conf_array has dimensions (self.conf_per_block, 2, self.configuration.N, self.configuration.D)
#output_reference['trajectory_saver/positions'][timeblock], output_reference['trajectory_saver/images'][timeblock] = data[:, 0], data[:, 1]
for key in self.saving_list:
output_reference[f'trajectory/{key}'][timeblock] = data[:,self.saving_list.index(key)]
#output['trajectory_saver'][block, :] = self.d_conf_array.copy_to_host()
if self.include_simbox:
output_reference['trajectory/sim_box'][timeblock, :] = self.d_sim_box_output_array.copy_to_host()
self.zero_kernel(self.d_conf_array)
def get_poststep_kernel(self, configuration, compute_plan, verbose=False):
pb, tp, gridsync = [compute_plan[key] for key in ['pb', 'tp', 'gridsync']]
if gridsync:
def kernel(grid, vectors, scalars, r_im, sim_box, step, conf_saver_params):
pass
return
return cuda.jit(device=gridsync)(kernel)
else:
def kernel(grid, vectors, scalars, r_im, sim_box, step, conf_saver_params):
pass
return kernel
def get_prestep_kernel(self, configuration, compute_plan, verbose=False):
# Unpack parameters from configuration and compute_plan
D, num_part = configuration.D, configuration.N
pb, tp, gridsync = [compute_plan[key] for key in ['pb', 'tp', 'gridsync']]
num_blocks = (num_part - 1) // pb + 1
sim_box_array_length = configuration.simbox.len_sim_box_data
include_simbox = self.include_simbox
# Unpack indices for scalars to be compiled into kernel
unpacked_saving_list = [key in self.saving_list for key in ['positions', 'images', 'velocities', 'forces']]
save_r, save_img, save_v, save_f = unpacked_saving_list
pos_r, pos_img, pos_v, pos_f = [self.saving_list.index(key) if key in self.saving_list else None for key in ['positions', 'images', 'velocities', 'forces']]
r_id, v_id, f_id = [configuration.vectors.indices[key] for key in ['r', 'v', 'f']]
# get function to check steps in the kernel, already compiled
stepcheck_function = numba.njit(getattr(self.scheduler, 'stepcheck_func'))
def kernel(grid, vectors, scalars, r_im, sim_box, step, conf_saver_params):
if include_simbox:
conf_array, sim_box_output_array = conf_saver_params
else:
conf_array, = conf_saver_params
flag, save_index = stepcheck_function(step)
if flag:
global_id, my_t = cuda.grid(2)
if global_id < num_part and my_t == 0:
for k in range(D):
if save_r: conf_array[save_index, pos_r, global_id, k] = vectors[r_id][global_id, k]
if save_img: conf_array[save_index, pos_img, global_id, k] = np.float32(r_im[global_id, k])
if save_v: conf_array[save_index, pos_v, global_id, k] = vectors[v_id][global_id, k]
if save_f: conf_array[save_index, pos_f, global_id, k] = vectors[f_id][global_id, k]
if include_simbox and global_id == 0:
for k in range(sim_box_array_length):
sim_box_output_array[save_index, k] = sim_box[k]
return
kernel = cuda.jit(device=gridsync)(kernel)
if gridsync:
return kernel # return device function
else:
return kernel[num_blocks, (pb, 1)] # return kernel, incl. launch parameters