Source code for gamdpy.runtime_actions.stress_saver

import numpy as np
import numba
import math
from numba import cuda, config
import json

from .runtime_action import RuntimeAction
from .time_scheduler import Lin


[docs] class StressSaver(RuntimeAction): """ Runtime action for saving stress tensor(a D x D matrix) during a timeblock every `steps_between_output` time steps. """ def __init__(self, steps_between_output:int = 16, compute_flags = None, verbose=False) -> None: if type(steps_between_output) != int or steps_between_output < 0: raise ValueError(f'steps_between_output ({steps_between_output}) should be non-negative integer.') # For now only to put the scheduler information in the h5 file self.scheduler = Lin(steps_between=steps_between_output) self.steps_between_output = steps_between_output self.compute_flags = compute_flags self.verbose = verbose def get_compute_flags(self): return self.compute_flags def setup(self, configuration, num_timeblocks:int, steps_per_timeblock:int, output, verbose=False) -> None: self.configuration = configuration D = configuration.D if type(num_timeblocks) != int or num_timeblocks < 0: raise ValueError(f'num_timeblocks ({num_timeblocks}) should be non-negative integer.') self.num_timeblocks = num_timeblocks if type(steps_per_timeblock) != int or steps_per_timeblock < 0: raise ValueError(f'steps_per_timeblock ({steps_per_timeblock}) should be non-negative integer.') self.steps_per_timeblock = steps_per_timeblock if self.steps_between_output >= steps_per_timeblock: raise ValueError(f'scalar_output ({self.steps_between_output}) must be less than steps_per_timeblock ({steps_per_timeblock})') # per block saving of stress tensor if not configuration.compute_flags['stresses']: raise RuntimeError('stresses not set in compute_flags') self.stress_saves_per_block = self.steps_per_timeblock//self.steps_between_output # Setup output shape = (self.num_timeblocks, self.stress_saves_per_block, D, D) if 'stresses' in output.keys(): del output['stresses'] output.create_group('stresses') output.create_dataset('stresses/stress_tensor', shape=shape, chunks=(1, self.stress_saves_per_block, D, D), dtype=np.float32) output['stresses'].attrs['steps_between_output'] = self.steps_between_output # LC: This should be removed because it's above already # Setup scheduler, and write the relevant information to the h5 file self.scheduler.setup(stepmax=self.steps_per_timeblock, ntimeblocks=self.num_timeblocks) self.scheduler.info_to_h5(output['stresses']) #output['stresses'].attrs['scheduler'] = 'Lin' #self.scheduler.__class__.__name__ #output['stresses'].attrs['scheduler'] = self.scheduler.__class__.__name__ #output['stresses'].attrs['scheduler_info'] = json.dumps(self.scheduler.kwargs) #output['stresses'].attrs['scheduler_info'] = json.dumps({'Dt':self.steps_between_output}) #json.dumps(self.scheduler.kwargs) #output['stresses'].create_dataset('steps', data=self.scheduler.steps, dtype=np.int32) flag = config.CUDA_LOW_OCCUPANCY_WARNINGS config.CUDA_LOW_OCCUPANCY_WARNINGS = False self.zero_kernel = self.make_zero_kernel_3() config.CUDA_LOW_OCCUPANCY_WARNINGS = flag def make_zero_kernel_3(self): """ Returns a kernel that can zero an array with three axes """ def zero_kernel(array): Nx, Ny, Nz = array.shape #i, j = cuda.grid(2) # doing simple 1 thread kernel for now ... for i in range(Nx): for j in range(Ny): for k in range(Nz): array[i,j,k] = numba.float32(0.0) zero_kernel = cuda.jit(zero_kernel) return zero_kernel[1,1] def get_params(self, configuration, compute_plan): D = configuration.D self.output_array = np.zeros((self.stress_saves_per_block, D, D), dtype=np.float32) self.d_output_array = cuda.to_device(self.output_array) self.params = (self.steps_between_output, self.d_output_array) return self.params def initialize_before_timeblock(self, timeblock: int, output_reference): self.zero_kernel(self.d_output_array) def update_at_end_of_timeblock(self, timeblock: int, output_reference): volume = self.configuration.get_volume() output_reference['stresses/stress_tensor'][timeblock, :] = self.d_output_array.copy_to_host() / volume def get_prestep_kernel(self, configuration, compute_plan): # Unpack parameters from configuration and compute_plan D, num_part = configuration.D, configuration.N pb, tp, gridsync = [compute_plan[key] for key in ['pb', 'tp', 'gridsync']] num_blocks = (num_part - 1) // pb + 1 m_id = configuration.sid['m'] v_id = configuration.vectors.indices['v'] sx_id = configuration.vectors.indices['sx'] sy_id = configuration.vectors.indices['sy'] if D > 2: sz_id = configuration.vectors.indices['sz'] if D > 3: sw_id = configuration.vectors.indices['sw'] #volume_function = numba.njit(configuration.simbox.get_volume_function()) def kernel(grid, vectors, scalars, r_im, sim_box, step, runtime_action_params): """ """ steps_between_output, output_array = runtime_action_params # Needs to be compatible with get_params above if step%steps_between_output==0: save_index = step//steps_between_output if save_index < output_array.shape[0]: global_id, my_t = cuda.grid(2) if global_id < num_part and my_t == 0: my_m = scalars[global_id][m_id] for k in range(D): cuda.atomic.add(output_array, (save_index, 0, k), vectors[sx_id][global_id][k] - my_m * vectors[v_id][global_id][0]*vectors[v_id][global_id][k]) cuda.atomic.add(output_array, (save_index, 1, k), vectors[sy_id][global_id][k] - my_m * vectors[v_id][global_id][1]*vectors[v_id][global_id][k]) if D > 2: cuda.atomic.add(output_array, (save_index, 2, k), vectors[sz_id][global_id][k] - my_m * vectors[v_id][global_id][2]*vectors[v_id][global_id][k]) if D > 3: cuda.atomic.add(output_array, (save_index, 3, k), vectors[sw_id][global_id][k] - my_m * vectors[v_id][global_id][3]*vectors[v_id][global_id][k]) return kernel = cuda.jit(device=gridsync)(kernel) if gridsync: return kernel # return device function else: return kernel[num_blocks, (pb, 1)] # return kernel, incl. launch parameters #### CONSIDER USING POSTSTEP TO GET CORRECT VELOCITIES ############### def get_poststep_kernel(self, configuration, compute_plan, verbose=False): pb, tp, gridsync = [compute_plan[key] for key in ['pb', 'tp', 'gridsync']] if gridsync: def kernel(grid, vectors, scalars, r_im, sim_box, step, conf_saver_params): pass return return cuda.jit(device=gridsync)(kernel) else: def kernel(grid, vectors, scalars, r_im, sim_box, step, conf_saver_params): pass return kernel # Class functions to read data def extract(h5file, first_block=0, last_block=None, subsample=1): #stress_data = h5file['stress_saver/stress_tensor'] h5grp = h5file['stresses'] nblocks, per_block, D, D2 = h5grp['stress_tensor'].shape assert D == D2 final_rows = (nblocks-first_block) * per_block return h5grp['stress_tensor'][first_block:last_block,:,:, :].reshape(final_rows, D, D)[::subsample] def get_times(h5file, first_block=0, last_block=None, reset_time=True, subsample=1): num_timeblock, saves_per_timeblock = h5file['stresses/stress_tensor'][first_block:last_block,:,0,0].shape times_array = np.arange(0,num_timeblock*saves_per_timeblock, step=subsample)*h5file.attrs['dt'] return times_array