import numpy as np
import numba
import math
from numba import cuda
import matplotlib.pyplot as plt
import gamdpy as gp
from .interaction import Interaction
[docs]
class PairPotential(Interaction):
""" Pairwise interaction potential for a system of particles.
Parameters
----------
pairpotential_function : callable
A JIT compiled function `f(r, params)` that takes a separation distance `r` (float)
and a list of parameters, and returns a triplet of floats:
- Potential energy, :math:`u(r)`
- Force multiplier, :math:`-u'(r)/r`
- Second derivative of potential energy, :math:`u''(r)`
params : list of floats or nested list of floats
Interaction parameters for the pair potential function. Use a nested list for multiple types of particles.
The last element of each list is the cutoff distance of the pair potential.
max_num_nbs : int
Maximum number of neighbors per particle to allocate in the neighbor list.
exclusions : array_like
List of particle indices to exclude from interactions for each particle.
Example
-------
The standard Lennard-Jones potential shifted and truncated at 2.5:
>>> pair_func = gp.apply_shifted_potential_cutoff(gp.LJ_12_6_sigma_epsilon)
>>> sig, eps, cut = 1.0, 1.0, 2.5
>>> pair_pot = gp.PairPotential(pair_func, params=[sig, eps, cut], max_num_nbs=1000)
>>> interactions = [pair_pot, ] # Passed to a Simulation instance
"""
def __init__(self, pairpotential_function, params, max_num_nbs, exclusions=None):
def params_function(i_type, j_type, params):
result = params[i_type, j_type] # default: read from params array
return result
self.pairpotential_function = pairpotential_function
self.params_function = params_function
self.params_user = params
if exclusions is None or type(exclusions) == np.ndarray:
self.exclusions = exclusions
elif type(exclusions) == list:
self.exclusions = self.merge_exclusions(exclusions)
else:
raise ValueError("Class PairPotential: parameter exclusions must be a numpy array or a list of numpy arrays")
self.max_num_nbs = max_num_nbs
[docs]
def merge_exclusions(self, exclusion_list, max_exclusions=None):
""" Take a list of exclusion arrays are return a merged array"""
if not max_exclusions:
max_exclusions = max([excl.shape[1] for excl in exclusion_list]) - 1
num_excl_arrays = len(exclusion_list)
n_excl = exclusion_list[0].shape[0]
merged_exclusions = np.zeros( (n_excl, max_exclusions+1), dtype=np.int32)
for idx in range(n_excl):
this_excl_list = []
for n in range(num_excl_arrays):
num_elements_this_atom = exclusion_list[n][idx, -1]
this_excl_list.append(exclusion_list[n][idx, 0:num_elements_this_atom])
# Now concatenate to make a single array, removing duplicates
this_excl_arr = np.unique(np.concatenate(this_excl_list))
size_this_excl_arr = len(this_excl_arr)
merged_exclusions[idx, 0:size_this_excl_arr] = this_excl_arr
merged_exclusions[idx, -1] = size_this_excl_arr
return merged_exclusions
def convert_user_params(self):
# Upgrade any scalar parameters to 1x1 numpy array
num_params = len(self.params_user)
params_list = []
for parameter in self.params_user:
if np.isscalar(parameter):
params_list.append(np.ones((1,1))*parameter)
else:
params_list.append(np.array(parameter, dtype=np.float32))
# Ensure all parameters are the right format (num_types x num_types) numpy arrays
num_types = params_list[0].shape[0]
for parameter in params_list:
assert len(parameter.shape) == 2
assert parameter.shape[0] == num_types
assert parameter.shape[1] == num_types
# Convert params to the format required by kernels (num_types x num_types) array of tuples (p0, p1, ..., cutoff)
params = np.zeros((num_types, num_types), dtype="f,"*num_params)
for i in range(num_types):
for j in range(num_types):
plist = []
for parameter in params_list:
plist.append(parameter[i,j])
params[i,j] = tuple(plist)
max_cut = np.float32(np.max(params_list[-1]))
return params, max_cut
def plot(self, xlim=None, ylim=(-3,6), figsize=(8,4), names=None):
params, max_cut = self.convert_user_params()
num_types = len(params[0])
if names==None:
names = np.arange(num_types)
plt.figure(figsize=figsize)
for i in range(num_types):
for j in range(num_types):
r = np.linspace(0, params[i,j][-1], 1000)
u, s, lap = self.pairpotential_function(r, params[i,j])
plt.plot(r, u, label=f'{names[i]} - {names[j]}')
if xlim!=None:
plt.xlim(xlim)
plt.ylim(ylim)
plt.xlabel('Pair distance')
plt.ylabel('Pair potential')
plt.legend()
plt.show()
def evaluate_potential_function(self, r, types):
params, max_cut = self.convert_user_params()
u, s, lap = self.pairpotential_function(r, params[types[0], types[1]])
return u
def check_datastructure_validity(self) -> bool:
nbflag = self.nblist.d_nbflag.copy_to_host()
if nbflag[0] != 0 or nbflag[1] != 0:
raise RuntimeError(f'Neighbor-list is invalid. Try allocating space for more neighbors (max_num_nbs in PairPot object). Allocated size: {self.max_num_nbs}, but {nbflag[1]+1} neighbours found. {nbflag=}.')
return True
def get_params(self, configuration: gp.Configuration, compute_plan: dict, verbose=False) -> tuple:
self.params, max_cut = self.convert_user_params()
self.d_params = cuda.to_device(self.params)
if compute_plan['nblist'] == 'N squared':
self.nblist = gp.NbList2(configuration, self.exclusions, self.max_num_nbs)
elif compute_plan['nblist'] == 'linked lists':
self.nblist = gp.NbListLinkedLists(configuration, self.exclusions, self.max_num_nbs)
else:
raise ValueError(f"No lblist called: {compute_plan['nblist']}. Use either 'N squared' or 'linked lists'")
nblist_params = self.nblist.get_params(max_cut, compute_plan, verbose)
return (self.d_params, self.nblist.d_nblist, nblist_params)
def get_kernel(self, configuration: gp.Configuration, compute_plan: dict, compute_flags: dict[str,bool], verbose=False):
num_cscalars = configuration.num_cscalars
compute_u = compute_flags['U']
compute_w = compute_flags['W']
compute_lap = compute_flags['lapU']
compute_stresses = compute_flags['stresses']
# Unpack parameters from configuration and compute_plan
D, num_part = configuration.D, configuration.N
pb, tp, gridsync, UtilizeNIII = [compute_plan[key] for key in ['pb', 'tp', 'gridsync', 'UtilizeNIII']]
num_blocks = (num_part - 1) // pb + 1
if verbose:
print(f'\tpb: {pb}, tp:{tp}, num_blocks:{num_blocks}')
print(f'\tNumber (virtual) particles: {num_blocks*pb}')
print(f'\tNumber of threads {num_blocks*pb*tp}')
if compute_stresses:
print('\tIncluding computation of stress tensor in pair potential')
# Unpack indices for vectors and scalars to be compiled into kernel
r_id, f_id = [configuration.vectors.indices[key] for key in ['r', 'f']]
if compute_u:
u_id = configuration.sid['U']
if compute_w:
w_id = configuration.sid['W']
if compute_lap:
lap_id = configuration.sid['lapU']
if compute_stresses:
sx_id = configuration.vectors.indices['sx']
if D > 1:
sy_id = configuration.vectors.indices['sy']
if D > 2:
sz_id = configuration.vectors.indices['sz']
if D > 3:
sw_id = configuration.vectors.indices['sw']
pairpotential_function = numba.njit(self.pairpotential_function)
if UtilizeNIII:
virial_factor_NIII = numba.float32( 1.0/configuration.D)
def pairpotential_calculator(ij_dist, ij_params, dr, my_f, cscalars, my_stress, f, other_id):
u, s, umm = pairpotential_function(ij_dist, ij_params)
for k in range(D):
cuda.atomic.add(f, (other_id, k), dr[k]*s)
my_f[k] = my_f[k] - dr[k]*s # Force
if compute_w:
cscalars[w_id] += dr[k]*dr[k]*s*virial_factor_NIII # Virial
if compute_stresses:
for k2 in range(D):
my_stress[k,k2] -= dr[k]*dr[k2]*s
if compute_u:
cscalars[u_id] += u # Potential energy
if compute_lap:
cscalars[lap_id] += (numba.float32(1-D)*s + umm)*numba.float32( 2.0 ) # Laplacian
return
else:
virial_factor = numba.float32( 0.5/configuration.D )
def pairpotential_calculator(ij_dist, ij_params, dr, my_f, cscalars, my_stress, f, other_id):
u, s, umm = pairpotential_function(ij_dist, ij_params)
half = numba.float32(0.5)
for k in range(D):
my_f[k] = my_f[k] - dr[k]*s # Force
if compute_w:
cscalars[w_id] += dr[k]*dr[k]*s*virial_factor # Virial
if compute_stresses:
for k2 in range(D):
my_stress[k,k2] -= half*dr[k]*dr[k2]*s # stress tensor
if compute_u:
cscalars[u_id] += half*u # Potential energy
if compute_lap:
cscalars[lap_id] += numba.float32(1-D)*s + umm # Laplacian
return
ptype_function = numba.njit(configuration.ptype_function)
params_function = numba.njit(self.params_function)
pairpotential_calculator = numba.njit(pairpotential_calculator)
dist_sq_dr_function = numba.njit(configuration.simbox.get_dist_sq_dr_function())
@cuda.jit( device=gridsync )
def calc_forces(vectors, cscalars, ptype, sim_box, nblist, params):
""" Calculate forces as given by pairpotential_calculator() (needs to exist in outer-scope) using nblist
Kernel configuration: [num_blocks, (pb, tp)]
"""
my_block = cuda.blockIdx.x
local_id = cuda.threadIdx.x
global_id = my_block*pb + local_id
my_t = cuda.threadIdx.y
max_nbs = nblist.shape[1]-1
#if global_id < num_part and my_t==0: # Initialize global variables. Should be controlled by flag if more pair-potentials
# for k in range(num_cscalars):
# cscalars[global_id, k] = numba.float32(0.0)
my_f = cuda.local.array(shape=D,dtype=numba.float32)
my_dr = cuda.local.array(shape=D,dtype=numba.float32)
my_cscalars = cuda.local.array(shape=num_cscalars, dtype=numba.float32)
if compute_stresses:
my_stress = cuda.local.array(shape=(D,D), dtype=numba.float32)
else:
my_stress = cuda.local.array(shape=(1,1), dtype=numba.float32)
if global_id < num_part:
for k in range(D):
#my_r[k] = r[global_id, k]
my_f[k] = numba.float32(0.0)
if compute_stresses:
for k2 in range(D):
my_stress[k,k2] = numba.float32(0.0)
for k in range(num_cscalars):
my_cscalars[k] = numba.float32(0.0)
my_type = ptype_function(global_id, ptype)
cuda.syncthreads() # Make sure initializing global variables to zero is done
if global_id < num_part:
for i in range(my_t, nblist[global_id, max_nbs], tp):
other_id = nblist[global_id, i]
other_type = ptype_function(other_id, ptype)
dist_sq = dist_sq_dr_function(vectors[r_id][other_id], vectors[r_id][global_id], sim_box, my_dr)
ij_params = params_function(my_type, other_type, params)
cut = ij_params[-1]
if dist_sq < cut*cut:
pairpotential_calculator(math.sqrt(dist_sq), ij_params, my_dr, my_f, my_cscalars, my_stress, vectors[f_id], other_id)
for k in range(D):
cuda.atomic.add(vectors[f_id], (global_id, k), my_f[k])
if compute_stresses:
cuda.atomic.add(vectors[sx_id], (global_id, k), my_stress[0,k])
if D > 1:
cuda.atomic.add(vectors[sy_id], (global_id, k), my_stress[1,k])
if D > 2:
cuda.atomic.add(vectors[sz_id], (global_id, k), my_stress[2,k])
if D > 3:
cuda.atomic.add(vectors[sw_id], (global_id, k), my_stress[3,k])
for k in range(num_cscalars):
cuda.atomic.add(cscalars, (global_id, k), my_cscalars[k])
return
nblist_check_and_update = self.nblist.get_kernel(configuration, compute_plan, compute_flags, verbose)
if gridsync:
# A device function, calling a number of device functions, using gridsync to syncronize
@cuda.jit( device=gridsync )
def compute_interactions(grid, vectors, scalars, ptype, sim_box, interaction_parameters):
params, nblist, nblist_parameters = interaction_parameters
nblist_check_and_update(grid, vectors, scalars, ptype, sim_box, nblist, nblist_parameters)
grid.sync()
calc_forces(vectors, scalars, ptype, sim_box, nblist, params)
return
return compute_interactions
else:
# A python function, making several kernel calls to syncronize
def compute_interactions(grid, vectors, scalars, ptype, sim_box, interaction_parameters):
params, nblist, nblist_parameters = interaction_parameters
nblist_check_and_update(grid, vectors, scalars, ptype, sim_box, nblist, nblist_parameters)
calc_forces[num_blocks, (pb, tp)](vectors, scalars, ptype, sim_box, nblist, params)
return
return compute_interactions