import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, TextIO, Tuple, Union
import numpy as np
from kliff.dataset.dataset import Configuration
from kliff.models.parameter import Parameter
from kliff.utils import yaml_dump, yaml_load
[docs]
class ComputeArguments:
"""
Compute property (e.g. energy, forces, and stress) for a configuration.
This is the base class for other compute arguments. Typically, a user will not
directly use this.
Args:
conf: atomic configurations
supported_species: species supported by the potential model, with chemical
symbol as key and integer code as value.
influence_distance: influence distance (aka cutoff distance) to calculate neighbors
compute_energy: whether to compute energy
compute_forces: whether to compute forces
compute_stress: whether to compute stress
"""
implemented_property = []
def __init__(
self,
conf: Configuration,
supported_species: Dict[str, int],
influence_distance: float,
compute_energy: bool = True,
compute_forces: bool = True,
compute_stress: bool = False,
):
self.conf = conf
self.supported_species = supported_species
self.influence_distance = influence_distance
self.compute_energy = compute_energy
self.compute_forces = compute_forces
self.compute_stress = compute_stress
self.compute_property = self._check_compute_property()
self.results = {p: None for p in self.implemented_property}
[docs]
def compute(self, params: Dict[str, Parameter]):
"""
Compute the properties required by the compute flags, and store them in
self.results.
Args:
params: the parameters of the model.
Example:
energy = a_func_to_compute_energy()
forces = a_func_to_compute_forces()
stress = a_func_to_compute_stress()
self.results['energy'] = energy
self.results['forces'] = forces
self.results['stress'] = stress
"""
raise NotImplementedError('"compute" method not implemented.')
[docs]
def get_compute_flag(self, name: str) -> bool:
"""
Check whether the model is asked to compute property.
Args:
name: name of the property, e.g. energy, forces, and stresses
"""
if name in self.compute_property:
return True
else:
return False
[docs]
def get_property(self, name: str) -> Any:
"""
Get a property by name.
Args:
name: name of the property, e.g. energy, forces, and stresses
"""
if name not in self.compute_property:
ModelError(f"Model not initialized to compute `{name}`.")
return self.results[name]
[docs]
def get_energy(self) -> float:
"""
Potential energy.
"""
return self.get_property("energy")
[docs]
def get_forces(self) -> np.ndarray:
"""
2D array of shape (N,3) of the forces on atoms, where N is the number of atoms
in the configuration.
"""
return self.get_property("forces")
[docs]
def get_stress(self) -> np.ndarray:
"""
1D array of the virial stress, in Voigt notation.
"""
return self.get_property("stress")
[docs]
def get_prediction(self) -> np.ndarray:
"""
1D array of prediction from the model for the configuration.
"""
if self.compute_energy:
energy = self.results["energy"]
pred = np.asarray([energy])
else:
pred = np.asarray([])
if self.compute_forces:
forces = self.results["forces"]
pred = np.concatenate((pred, forces.ravel()))
if self.compute_stress:
stress = self.results["stress"]
pred = np.concatenate((pred, stress))
return pred
[docs]
def get_reference(self) -> np.ndarray:
"""
1D array of reference values for the configuration.
"""
if self.compute_energy:
energy = self.conf.energy
ref = np.asarray([energy])
else:
ref = np.asarray([])
if self.compute_forces:
forces = self.conf.forces
ref = np.concatenate((ref, forces.ravel()))
if self.compute_stress:
stress = self.conf.stress
ref = np.concatenate((ref, stress))
return ref
def _check_compute_property(self):
compute_property = []
if self.compute_energy:
if "energy" not in self.implemented_property:
raise NotImplementedError("`Energy` not implemented in model.")
else:
compute_property.append("energy")
if self.compute_forces:
if "forces" not in self.implemented_property:
raise NotImplementedError("`Forces` not implemented in model.")
else:
compute_property.append("forces")
if self.compute_stress:
if "stress" not in self.implemented_property:
raise NotImplementedError("`Stress` not implemented in model.")
else:
compute_property.append("stress")
return compute_property
[docs]
class Model:
def __init__(
self,
model_name: str = None,
):
self.model_name = model_name
self.model_params = self.init_model_params()
self.influence_distance = self.init_influence_distance()
self.supported_species = self.init_supported_species()
self.mutable_param_list = []
[docs]
def init_model_params(self, *args, **kwargs) -> Dict[str, Parameter]:
raise NotImplementedError("`init_model_params` not implemented.")
[docs]
def init_influence_distance(self, *args, **kwargs) -> float:
raise NotImplementedError("`init_influence_distance` not implemented.")
[docs]
def init_supported_species(self, *args, **kwargs) -> Dict[str, int]:
raise NotImplementedError("`init_supported_species` not implemented.")
[docs]
def get_compute_argument_class(self):
raise NotImplementedError("`get_compute_argument_class` not implemented.")
[docs]
def write_kim_model(self, path: Path = None):
raise NotImplementedError("`write_kim_model` not implemented yet.")
[docs]
def get_influence_distance(self) -> float:
return self.influence_distance
[docs]
def get_supported_species(self) -> Dict[str, int]:
return self.supported_species
[docs]
def get_model_params(self) -> Dict[str, Parameter]:
return self.model_params
[docs]
def echo_model_params(
self,
filename: Union[Path, TextIO, None] = sys.stdout,
) -> str:
params = self.model_params
s = "#" + "=" * 80 + "\n"
s += "# Available parameters to optimize (In MODEL SPACE).\n"
name = self.__class__.__name__ if self.model_name is None else self.model_name
s += f"# Model: {name}\n"
s += "#" + "=" * 80 + "\n\n"
for name, p in params.items():
s += f"name: {name}\n"
s += f"value: {p.get_numpy_array_model_space()}\n"
s += f"size: {len(p)}\n\n"
s += "#" + "=" * 80 + "\n"
s += (
"# Following parameters have transformation objects attached, \n"
"# Parameter value in PARAM SPACE: \n"
)
for name, p in params.items():
if p.transform_function is not None:
s += f"name: {name}\n"
s += f"value: {p.get_numpy_array_param_space()}\n"
s += f"size: {len(p)}\n\n"
s += "#" + "=" * 80 + "\n"
if filename is not None:
if isinstance(filename, (str, Path)):
with open(filename, "w") as f:
f.write(s)
else:
print(s, file=filename)
return s
[docs]
def read_opt_params(self, filename: Path):
pass
[docs]
def set_params_mutable(self, list_of_params: List[str]):
"""
Set all the optimizable parameters from list of names of parameters
Args:
list_of_params: List of string names of parameters
Example:
model.set_params_mutable(["A", "B", "sigma"])
"""
for param in list_of_params:
self.model_params[param].add_opt_mask(
np.ones_like(self.model_params[param], dtype=bool)
)
self.mutable_param_list = list_of_params
[docs]
def set_opt_params(self, **kwargs):
keys = list(kwargs.keys())
for name, setting in kwargs.items():
self.set_one_opt_param(name, setting)
[docs]
def set_one_opt_param(self, name: str, settings: List[List[Any]]):
param = self.model_params[name]
# check the val kind
opt_mask = np.zeros_like(param, dtype=bool)
param_old = param.get_numpy_array_param_space()
bounds = np.array(
[[None, None]] * param_old.shape[0]
) # for consistent boolean matching
for i in range(param_old.shape[0]):
supplied_value = settings[i][0]
if supplied_value == "default":
param_old[i] = param.get_numpy_array_param_space()[i]
elif isinstance(supplied_value, (int, float)):
param_old[i] = supplied_value
elif isinstance(supplied_value, np.ndarray) or isinstance(
supplied_value, Parameter
):
param_old[i] = supplied_value[0]
else:
raise ValueError("Settings array is not properly formatted")
# bounds
# replace "inf" with np.inf
if len(settings[i]) > 1:
if len(settings[i]) == 3:
bounds[i] = [settings[i][1], settings[i][2]]
opt_mask[i] = True
elif settings[i][1] == "fix":
opt_mask[i] = False
else:
raise ValueError("Supplied value is not properly formatted")
else:
opt_mask[i] = True
if (bounds != np.array([[None, None]] * param_old.shape[0])).all():
param.add_bounds_param_space(bounds)
# When model is operating with transformed parameters
# input is expected in transformed space
param.add_opt_mask(opt_mask)
for i in range(param_old.shape[0]):
param.copy_at_param_space(param_old[i], i)
# update mutable param list
if name not in self.mutable_param_list and param.is_mutable:
self.mutable_param_list.append(name)
if name in self.mutable_param_list and not param.is_mutable:
self.mutable_param_list.remove(name)
[docs]
def echo_opt_params(self, filename: [Path, TextIO, None] = sys.stdout):
"""
Echo the optimizing parameter to a file.
"""
for param_key in self.model_params:
if self.model_params[param_key].is_mutable:
print(
f"Parameter:{param_key} : {self.model_params[param_key].get_numpy_array_model_space()}"
)
# return self.opt_params.echo_opt_params(filename)
[docs]
def get_num_opt_params(self) -> int:
"""
Count and return number of optimizable parameters.
Utilizes `Parameter` class.
"""
i = 0
for param_key in self.model_params:
if self.model_params[param_key].is_mutable:
i += (
self.model_params[param_key]
.get_opt_numpy_array_param_space()
.shape[0]
)
return i
[docs]
def get_opt_params(self) -> np.ndarray:
"""
Get optimizable parameters, concatenated as a single numpy array. Obtained numpy array is the state for
the optimizer to optimize.
Utilizes `Parameter` class.
"""
opt_param = np.array([])
for param_key in self.mutable_param_list:
if self.model_params[param_key].is_mutable: # additional check
opt_param = np.append(
opt_param,
self.model_params[param_key].get_opt_numpy_array_param_space(),
)
else:
# This should not happen
raise AttributeError(
f"Parameter {param_key}, is not optimizable. Please report this error"
)
return opt_param
[docs]
def update_model_params(
self, params: Union[np.ndarray, List[Union[float, int, Parameter]]]
):
"""
Copy and update the parameter from incoming params array. This method utilizes the
parameters internal function to copy the parameter in a consistent manner.
Args:
params: numpy array with the shape of optimized parameter concatenated array.
"""
i = 0
for param_key in self.mutable_param_list:
if self.model_params[param_key].is_mutable:
param_size = (
self.model_params[param_key]
.get_opt_numpy_array_param_space()
.shape[0]
)
self.model_params[param_key].copy_from_param_space(
params[i : i + param_size]
)
i += param_size
else:
raise AttributeError(
f"Parameter {param_key}, is not optimizable. Please report this error"
)
[docs]
def get_opt_param_name_value_and_indices(
self, index: int
) -> Tuple[str, Union[float, np.ndarray], int]:
for param_key in self.mutable_param_list:
if self.model_params[param_key].is_mutable:
if index == self.model_params[param_key].index:
return self.model_params[
param_key
].get_opt_param_name_value_and_indices()
raise ModelError(f"Index {index} not found in mutable parameters.")
[docs]
def opt_params_has_bounds(self) -> bool:
"""
Whether bounds are set for any of the parameters.
Returns:
boolean true if any of the parameters are marked mutable.
"""
has_bounds = False
for param in self.model_params:
if self.model_params[param].bounds is not None:
has_bounds = True
break
return has_bounds
[docs]
def save(self, filename: Path = "trained_model.yaml"):
"""
Save a model to disk.
Args:
filename: Path where to store the model.
"""
opt_params = {}
for param_key in self.model_params:
if self.model_params[param_key].is_mutable:
opt_params[param_key] = self.model_params[param_key].as_dict()
d = {
"@module": self.__class__.__module__,
"@class": self.__class__.__name__,
"opt_params": opt_params,
}
yaml_dump(d, filename)
[docs]
def load(self, filename: Path = "trained_model.yaml"):
"""
Load a model on disk into memory.
Args:
filename: Path where the model is stored.
"""
d = yaml_load(filename)
for param in d["opt_params"]:
self.model_params[param].from_dict(d["opt_params"][param])
[docs]
def named_parameters(self):
"""
Get a dict of parameters that are marked as mutable, and hence can be optimized.
The parameter values are subjected to change as per the transformations applied.
Returns:
Dictionary of parameters (~kliff.models.parameters.Parameter)
"""
param_opt_dict = {}
for param_key in self.model_params:
if self.model_params[param_key].is_mutable:
param_opt_dict[param_key] = self.model_params[param_key]
return param_opt_dict
[docs]
def parameters(self):
"""
Get a list of parameters that are marked as mutable, and hence can be optimized.
Returns:
List of parameters (~kliff.models.parameters.Parameter)
"""
param_opt_list = []
for param_key in self.model_params:
if self.model_params[param_key].is_mutable:
param_opt_list.append(self.model_params[param_key])
return param_opt_list
# def parameters(self):
# """
# Get an iterator of parameters that are marked as mutable, and hence can be optimized.
#
# Returns:
# Iterator of parameters (~kliff.models.parameters.Parameter)
# """
# param_opt_list = []
# for param_key in self.model_params:
# if self.model_params[param_key].is_mutable:
# yield self.model_params[param_key]
#
[docs]
class ModelError(Exception):
def __init__(self, msg):
super(ModelError, self).__init__(msg)
self.msg = msg