Source code for gamdpy.integrators.NVT

import numpy as np
import numba
from numba import cuda
import gamdpy as gp
from .integrator import Integrator

[docs] class NVT(Integrator): """ The leapfrog algorithm with a Nose-Hoover thermostat Integrator keeping N (number of particles), V (volume), and T (temperature) constant, using the leapfrog algorithm and a Nose-Hoover thermostat. Parameters ---------- temperature : float or function The temperature to keep the system at. If a function, it should take time as argument. tau : float The relaxation time of the thermostat. dt : float The time step of the integration. """ def __init__(self, temperature, tau: float, dt: float) -> None: self.temperature = temperature self.tau = tau self.dt = dt self.thermostat_state = np.zeros(2, dtype=np.float32) # Right time to allocate and copy to device? self.d_thermostat_state = cuda.to_device(self.thermostat_state) # - or in get_params def get_params(self, configuration: gp.Configuration, interactions_params: tuple, verbose=False) -> tuple: dt = np.float32(self.dt) omega2 = np.float32(4.0 * np.pi * np.pi / self.tau / self.tau) degrees = configuration.N * configuration.D - configuration.D return (dt, omega2, degrees, self.d_thermostat_state) # Needs to be compatible with unpacking in # step() and update_thermostat_state() below. def get_kernel(self, configuration: gp.Configuration, compute_plan: dict, compute_flags: dict, interactions_kernel, verbose=False): # Unpack parameters from configuration and compute_plan D, num_part = configuration.D, configuration.N pb, tp, gridsync = [compute_plan[key] for key in ['pb', 'tp', 'gridsync']] num_blocks = (num_part - 1) // pb + 1 # Convert temperature to a function if isn't allready (better be a number then...) if callable(self.temperature): temperature_function = self.temperature else: temperature_function = gp.make_function_constant(value=float(self.temperature)) if verbose: print(f'Generating NVT kernel for {num_part} particles in {D} dimensions:') print(f'\tpb: {pb}, tp:{tp}, num_blocks:{num_blocks}') print(f'\tNumber (virtual) particles: {num_blocks * pb}') print(f'\tNumber of threads {num_blocks * pb * tp}') # Unpack indices for vectors and scalars to be compiled into kernel r_id, v_id, f_id = [configuration.vectors.indices[key] for key in ['r', 'v', 'f']] m_id = configuration.sid['m'] compute_k = compute_flags['K'] compute_fsq = compute_flags['Fsq'] if compute_k: k_id = configuration.sid['K'] if compute_fsq: fsq_id = configuration.sid['Fsq'] # JIT compile functions to be compiled into kernel temperature_function = numba.njit(temperature_function) apply_PBC = numba.njit(configuration.simbox.get_apply_PBC()) def step(grid, vectors, scalars, r_im, sim_box, integrator_params, time): """ Make one NVT timestep using Leap-frog Kernel configuration: [num_blocks, (pb, tp)] """ # Unpack parameters. MUST be compatible with get_params() above dt, omega2, degrees, thermostat_state = integrator_params factor = np.float32(0.5) * thermostat_state[0] * dt plus = np.float32(1.) / (np.float32(1.) + factor) # Possibly change to exp(...) minus = np.float32(1.) - factor # Possibly change to exp(...) global_id, my_t = cuda.grid(2) if global_id < num_part and my_t == 0: my_r = vectors[r_id][global_id] my_v = vectors[v_id][global_id] my_f = vectors[f_id][global_id] my_m = scalars[global_id][m_id] my_k = numba.float32(0.0) # Kinetic energy if compute_fsq: my_fsq = numba.float32(0.0) # force squared for k in range(D): if compute_fsq: my_fsq += my_f[k] * my_f[k] my_v[k] = plus * (minus * my_v[k] + my_f[k] / my_m * dt) my_k += numba.float32(0.5) * my_m * my_v[k] * my_v[k] my_r[k] += my_v[k] * dt apply_PBC(my_r, r_im[global_id], sim_box) cuda.atomic.add(thermostat_state, 1, my_k) # Probably slow? Not really! if compute_k: scalars[global_id][k_id] = my_k if compute_fsq: scalars[global_id][fsq_id] = my_fsq return def update_thermostat_state(integrator_params, time): # Unpack parameters. MUST be compatible with get_params() above dt, omega2, degrees, thermostat_state = integrator_params global_id, my_t = cuda.grid(2) if global_id == 0 and my_t == 0: target_temperature = temperature_function(time) ke_deviation = np.float32(2.0) * thermostat_state[1] / (degrees * target_temperature) - np.float32(1.0) thermostat_state[0] += dt * omega2 * ke_deviation thermostat_state[1] = np.float32(0.) return step = cuda.jit(device=gridsync)(step) update_thermostat_state = cuda.jit(device=gridsync)(update_thermostat_state) if gridsync: # construct and return device function def kernel(grid, vectors, scalars, r_im, sim_box, integrator_params, time, ptype): step( grid, vectors, scalars, r_im, sim_box, integrator_params, time) grid.sync() update_thermostat_state(integrator_params, time) return return cuda.jit(device=gridsync)(kernel) else: # return python function, which makes kernel-calls def kernel(grid, vectors, scalars, r_im, sim_box, integrator_params, time, ptype): step[num_blocks, (pb, 1)](grid, vectors, scalars, r_im, sim_box, integrator_params, time) update_thermostat_state[1, (1, 1)](integrator_params, time) return return kernel