import pickle
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from loguru import logger
from kliff import parallel
from kliff.dataset import Configuration
from kliff.utils import create_directory, pickle_dump, to_path
[docs]class Descriptor:
"""
Base class of atomic environment descriptors.
Process dataset to generate fingerprints. This is the base class for all descriptors,
so it should not be used directly. Instead, descriptors built on top of this such as
:class:`~kliff.descriptors.SymmetryFunction` and
:class:`~kliff.descriptors.Bispectrum` can be used to transform the atomic environment
information into fingerprints.
Args:
cut_dists: Cutoff distances, with key of the form `A-B` where `A` and `B` are
species string, and value should be a float.
Example: `cut_dists = {'C-C': 5.0}`
cut_name: Name of the cutoff function, such as `cos`, `P3`, and `P7`.
hyperparams: A dictionary of the hyperparams of the descriptor or a string to
select the predefined hyperparams.
normalize: If `True`, the fingerprints is centered and normalized:
`zeta = (zeta - mean(zeta)) / stdev(zeta)`
dtype: np.dtype
Data type of the generated fingerprints, such as `np.float32` and `np.float64`.
Attributes:
size: int
Length of the fingerprint vector.
mean: list
Mean of the fingerprints.
stdev: list
Standard deviation of the fingerprints.
"""
def __init__(
self,
cut_dists: Dict[str, float],
cut_name: str,
hyperparams: Union[Dict, str],
normalize: bool = True,
dtype=np.float32,
):
self.cut_dists = cut_dists
self.cut_name = cut_name
self.hyperparams = hyperparams
self.normalize = normalize
self.dtype = dtype
# size, mean, and stdev of fingerprints; mean and stdev will be used only when
# `normalize=True`
self.size = None
self.mean = None
self.stdev = None
[docs] def generate_fingerprints(
self,
configs: List[Configuration],
fit_forces: bool = False,
fit_stress: bool = False,
fingerprints_filename: Union[Path, str] = "fingerprints.pkl",
fingerprints_mean_stdev_filename: Optional[Union[Path, str]] = None,
use_welford_method: bool = False,
nprocs: int = 1,
):
"""
Convert all configurations to their fingerprints.
Args:
configs: Dataset configurations
fit_forces: Whether to compute the gradient of fingerprints w.r.t. atomic
coordinates so as to compute forces.
fit_stress: Whether to compute the gradient of fingerprints w.r.t. atomic
coordinates so as to compute stress.
use_welford_method: Whether to compute mean and standard deviation using the
Welford method, which is memory efficient. See
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
fingerprints_filename: Path to dump fingerprints to a pickle file.
fingerprints_mean_stdev_filename: Path to dump the mean and standard
deviation of the fingerprints as a pickle file. If `normalize=False`
for the descriptor, this is ignored.
nprocs: Number of processes used to generate the fingerprints. If `1`, run
in serial mode, otherwise `nprocs` processes will be forked via
multiprocessing to do the work.
"""
if self.mean is not None and self.stdev is not None:
has_mean_stdev = True
else:
has_mean_stdev = False
# compute mean and stdev
if self.normalize and not has_mean_stdev:
logger.info("Start computing mean and stdev of fingerprints.")
if use_welford_method:
logger.info(
"Computing fingerprint mean and stdev using the Welford method."
)
self.mean, self.stdev = self._welford_mean_and_stdev(configs)
zeta, dzetadr_forces, dzetadr_stress = None, None, None
else:
zeta, dzetadr_forces, dzetadr_stress = self._calc_zeta_dzetadr(
configs, fit_forces, fit_stress, nprocs
)
stacked = np.concatenate(zeta)
self.mean = np.mean(stacked, axis=0)
self.stdev = np.std(stacked, axis=0)
logger.info("Finish computing mean and stdev of fingerprints.")
# save to a pickle file
if fingerprints_mean_stdev_filename is None:
fingerprints_mean_stdev_filename = "fingerprints_mean_and_stdev.pkl"
state_dict = self.state_dict()
pickle_dump(state_dict, fingerprints_mean_stdev_filename)
logger.info(
"Fingerprints mean and stdev saved to "
f"`{fingerprints_mean_stdev_filename}`."
)
else:
zeta, dzetadr_forces, dzetadr_stress = None, None, None
# generate fingerprints
self._dump_fingerprints(
configs,
fingerprints_filename,
zeta,
dzetadr_forces,
dzetadr_stress,
fit_forces,
fit_stress,
)
return fingerprints_filename
def _dump_fingerprints(
self,
configs,
fname,
all_zeta,
all_dzetadr_forces,
all_dzetadr_stress,
fit_forces,
fit_stress,
):
"""
Dump fingerprints to a pickle file.
"""
logger.info(f"Pickling fingerprints to `{fname}`")
create_directory(fname, is_directory=False)
# remove it, because we use append mode for the file below
fname = to_path(fname)
if fname.exists():
fname.unlink()
with open(fname, "ab") as f:
for i, conf in enumerate(configs):
if i % 100 == 0:
logger.info(f"Processing configuration: {i}.")
if all_zeta is None:
zeta, dzetadr_f, dzetadr_s = self.transform(
conf, fit_forces, fit_stress
)
else:
zeta = all_zeta[i]
dzetadr_f = all_dzetadr_forces[i]
dzetadr_s = all_dzetadr_stress[i]
# centering and normalization
if self.normalize:
zeta = (zeta - self.mean) / self.stdev
if fit_forces or fit_stress:
stdev_3d = np.atleast_3d(self.stdev)
if fit_forces:
dzetadr_f = dzetadr_f / stdev_3d
if fit_stress:
dzetadr_s = dzetadr_s / stdev_3d
# pickling data
zeta = np.asarray(zeta, self.dtype)
energy = np.asarray(conf.energy, self.dtype)
if fit_forces:
dzetadr_f = np.asarray(dzetadr_f, self.dtype)
forces = np.asarray(conf.forces, self.dtype)
if fit_stress:
dzetadr_s = np.asarray(dzetadr_s, self.dtype)
stress = np.asarray(conf.stress, self.dtype)
volume = np.asarray(conf.get_volume(), self.dtype)
example = {"configuration": conf, "zeta": zeta, "energy": energy}
if fit_forces:
example["dzetadr_forces"] = dzetadr_f
example["forces"] = forces
if fit_stress:
example["dzetadr_stress"] = dzetadr_s
example["stress"] = stress
example["volume"] = volume
pickle.dump(example, f)
logger.info(f"Pickle {len(configs)} configurations finished.")
def _calc_zeta_dzetadr(self, configs, fit_forces, fit_stress, nprocs=1):
"""
Calculate the fingerprints and maybe its gradients w.r.t the atomic coords.
"""
if nprocs == 1:
zeta = []
dzetadr_forces = []
dzetadr_stress = []
for conf in configs:
z, dzdr_f, dzdr_s = self.transform(conf, fit_forces, fit_stress)
zeta.append(z)
dzetadr_forces.append(dzdr_f)
dzetadr_stress.append(dzdr_s)
else:
rslt = parallel.parmap1(
self.transform, configs, fit_forces, fit_stress, nprocs=nprocs
)
zeta = [pair[0] for pair in rslt]
dzetadr_forces = [pair[1] for pair in rslt]
dzetadr_stress = [pair[2] for pair in rslt]
return zeta, dzetadr_forces, dzetadr_stress
def _welford_mean_and_stdev(self, configs: List[Configuration]):
"""
Compute the mean and standard deviation of fingerprints.
This running mean and standard method proposed by Welford is memory-efficient.
Besides, it outperforms the naive method from suffering numerical instability for
large dataset.
Args:
configs: Dataset class:`~kliff.dataset.Configuration`.
See Also:
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
"""
# number of features
conf = configs[0]
zeta, _, _ = self.transform(conf, fit_forces=False, fit_stress=False)
size = zeta.shape[1]
# Welford's method
n = 0
mean = np.zeros(size)
M2 = np.zeros(size)
for i, conf in enumerate(configs):
zeta, _, _ = self.transform(conf, fit_forces=False, fit_stress=False)
for row in zeta:
n += 1
delta = row - mean
mean += delta / n
delta2 = row - mean
M2 += delta * delta2
if i % 100 == 0:
logger.info("Processing configuration: {i}.")
sys.stdout.flush()
stdev = np.sqrt(M2 / n) # not the unbiased
return mean, stdev
[docs] def write_kim_params(
self, path: Union[Path, str], fname: str = "descriptor.params"
):
"""
Write descriptor info for KIM model.
Args:
path: Directory Path to write the file.
fname: Name of the file.
"""
raise NotImplementedError
[docs] def get_size(self):
"""Return the size of the descriptor vector."""
if self.mean is not None:
size = len(self.mean)
else:
size = self.size
return size
[docs] def get_mean(self):
"""Return a list of the mean of the fingerprints."""
return self.mean.copy()
[docs] def get_stdev(self):
"""Return a list of the standard deviation of the fingerprints."""
return self.stdev.copy()
[docs] def get_dtype(self):
"""Return the data type of the fingerprints."""
return self.dtype
[docs] def get_cutoff(self):
"""Return the name and values of cutoff."""
return self.cut_name, self.cut_dists
[docs] def get_hyperparams(self):
"""Return the hyperparameters of descriptors."""
return self.hyperparams
[docs] def state_dict(self) -> Dict[str, Any]:
"""
Return the state dict of the descriptor.
"""
data = {"mean": self.mean, "stdev": self.stdev, "size": self.size}
return data
[docs] def load_state_dict(self, data: Dict[str, Any]):
"""
Load state dict of a descriptor.
Args:
data: state dict to load.
"""
try:
mean = data["mean"]
stdev = data["stdev"]
size = data["size"]
except Exception as e:
raise DescriptorError(f"Corrupted state dict for descriptor: {str(e)}")
# more checks on data integrity
if mean is not None and stdev is not None and size is not None:
if len(mean.shape) != 1 or mean.shape[0] != size:
raise DescriptorError(f"Corrupted descriptor mean.")
if len(stdev.shape) != 1 or stdev.shape[0] != size:
raise DescriptorError("Corrupted descriptor standard deviation.")
self.mean = mean
self.stdev = stdev
self.size = size
def load_fingerprints(path: Union[Path, str]):
"""
Read preprocessed fingerprints from file.
This is the reverse operation of Descriptor._dump_fingerprints.
Args:
path: Path to the pickled data file.
Returns:
Fingerprints
"""
data = []
with open(path, "rb") as f:
try:
while True:
x = pickle.load(f)
data.append(x)
except EOFError:
pass
except Exception as e:
raise DescriptorError(f"Cannot load fingerprints from `{path}`. {str(e)}")
return data
def generate_full_cutoff(cutoff):
"""
Generate a full binary cutoff dictionary.
For species pair `S1-S2` in the ``cutoff`` dictionary, add key `S2-S1` to it, with
the same value as `S1-S2`.
Args:
cutoff: Cutoff dictionary with key of the form ``A-B`` where ``A`` and ``B``
are atomic species, and value should be a float.
Returns:
A dictionary with all combination of species as keys.
Example:
>>> cutoff = {'C-C': 4.0, 'C-H':3.5}
>>> generate_full_cutoff(cutoff)
{'C-C': 4.0, 'C-H':3.5, 'H-C':3.5}
"""
rcut2 = dict()
for key, val in cutoff.items():
s1, s2 = key.split("-")
if s1 != s2:
reverse_key = s2 + "-" + s1
if reverse_key in cutoff and cutoff[reverse_key] != val:
raise Exception(
'Corrupted cutoff dictionary. cutoff["{0}-{1}"] != '
'cutoff["{1}-{0}"].'.format(s1, s2)
)
else:
rcut2[reverse_key] = val
# merge
rcut2.update(cutoff)
return rcut2
def generate_unique_cutoff_pairs(cutoff):
"""
Generate a full binary cutoff dictionary.
For species pair `S1-S2` in the ``cutoff`` dictionary, remove key `S2-S1` from it if
`S1` is different from `S2`.
Args:
cutoff: Cutoff dictionary with key of the form ``A-B`` where ``A`` and ``B``
are atomic species, and value should be a float.
Returns:
A dictionary with unique species pair as keys.
Example:
>>> cutoff = {'C-C': 4.0, 'C-H':3.5, 'H-C':3.5}
>>> generate_unique_cutoff_pairs(cutoff)
{'C-C': 4.0, 'C-H':3.5}
"""
rcut2 = dict()
for key, val in cutoff.items():
s1, s2 = key.split("-")
reverse_key = s2 + "-" + s1
if key not in rcut2 and reverse_key not in rcut2:
rcut2[key] = val
return rcut2
def generate_species_code(cutoff):
"""
Generate species code info from cutoff dictionary.
Args:
cutoff: Cutoff dictionary with key of the form ``A-B`` where ``A`` and ``B``
are atomic species, and value should be a float.
Returns:
species_code: A dictionary of species and the integer code (starting from 0),
with keys the species in ``cutoff`` keys, and values integer code for species.
Example:
>>> cutoff = {'C-C': 4.0, 'C-H':3.5}
>>> generate_species_code(cutoff)
{'C':0, 'H':1}
"""
species = set()
for key in cutoff:
s1, s2 = key.split("-")
species.update([s1, s2])
species = list(species)
species_code = {s: i for i, s in enumerate(species)}
return species_code
class DescriptorError(Exception):
def __init__(self, msg):
super(DescriptorError, self).__init__(msg)
self.msg = msg