Source code for gamdpy.runtime_actions.momentum_reset

import numpy as np
import numba
import math
from numba import cuda

# Abstract Base Class and type annotation
from .runtime_action import RuntimeAction
from gamdpy import Configuration


# Could include flags of dimensions to work on
[docs] class MomentumReset(RuntimeAction): """ Runtime action that sets the total momentum of configuration to zero every `steps_between_reset` time step. """ def __init__(self, steps_between_reset: int) -> None: if type(steps_between_reset) != int or steps_between_reset < 0: raise ValueError(f'steps_between_momentum_reset ({steps_between_reset}) should be non-negative integer.') self.steps_between_reset = steps_between_reset def setup(self, simulation, num_timeblocks: int, steps_per_timeblock: int, output, verbose=False) -> None: pass def get_params(self, configuration: Configuration, compute_plan: dict) -> tuple: self.total_momentum = np.zeros(configuration.D+1, dtype=np.float32) # Total mass summed in last index of total_momentum self.d_total_momentum = cuda.to_device(self.total_momentum) return (self.d_total_momentum, ) # return parameters as a tuple def get_prestep_kernel(self, configuration: Configuration, compute_plan: dict): pb, tp, gridsync = [compute_plan[key] for key in ['pb', 'tp', 'gridsync']] if gridsync: def kernel(grid, vectors, scalars, r_im, sim_box, step, momentum_reset_params): pass return return cuda.jit(device=gridsync)(kernel) else: def kernel(grid, vectors, scalars, r_im, sim_box, step, momentum_reset_params): pass return kernel def get_poststep_kernel(self, configuration: Configuration, compute_plan: dict): # Unpack parameters from configuration and compute_plan D, num_part = configuration.D, configuration.N pb, tp, gridsync = [compute_plan[key] for key in ['pb', 'tp', 'gridsync']] num_blocks = (num_part - 1) // pb + 1 # Unpack indices for vectors and scalars to be compiled into kernel v_id = configuration.vectors.indices['v'] m_id = configuration.sid['m'] steps_between_reset = self.steps_between_reset def zero_momentum(cm_velocity): global_id, my_t = cuda.grid(2) if global_id == 0 and my_t == 0: for k in range(D+1): cm_velocity[k] = np.float32(0.) return def sum_momentum(vectors, scalars, cm_velocity): global_id, my_t = cuda.grid(2) if global_id < num_part and my_t == 0: my_m = scalars[global_id][m_id] for k in range(D): cuda.atomic.add(cm_velocity, k, my_m * vectors[v_id][global_id][k]) cuda.atomic.add(cm_velocity, D, my_m) # Total mass summed in last index of cm_velocity return def shift_velocities(vectors, cm_velocity): global_id, my_t = cuda.grid(2) if global_id < num_part and my_t == 0: for k in range(D): vectors[v_id][global_id,k] -= cm_velocity[k] / cm_velocity[D] return zero_momentum = cuda.jit(device=gridsync)(zero_momentum) sum_momentum = cuda.jit(device=gridsync)(sum_momentum) shift_velocities = cuda.jit(device=gridsync)(shift_velocities) if gridsync: def kernel(grid, vectors, scalars, r_im, sim_box, step, momentum_reset_params): cm_velocity, = momentum_reset_params if step%steps_between_reset == 0: zero_momentum(cm_velocity) grid.sync() sum_momentum(vectors, scalars, cm_velocity) grid.sync() shift_velocities(vectors, cm_velocity) return return cuda.jit(device=gridsync)(kernel) else: def kernel(grid, vectors, scalars, r_im, sim_box, step, momentum_reset_params): cm_velocity, = momentum_reset_params if step%steps_between_reset == 0: zero_momentum[1, 1](cm_velocity) sum_momentum[num_blocks, (pb, 1)](vectors, scalars, cm_velocity) shift_velocities[num_blocks, (pb, 1)](vectors, cm_velocity) return return kernel