Source code for kliff.models.neural_network

import os
from pathlib import Path
from typing import List, Optional

import numpy as np
import torch
from loguru import logger

from kliff.legacy.descriptors.descriptor import Descriptor
from kliff.models.model_torch import ModelTorch


[docs] class NeuralNetwork(ModelTorch): """ Neural Network model. A feed-forward neural network model. Args: descriptor: A descriptor that transforms atomic environment information to the fingerprints, which are used as the input for the neural network. seed: Global seed for random numbers. """ def __init__(self, descriptor: Descriptor, seed=35): super(NeuralNetwork, self).__init__(descriptor, seed) self.layers = None logger.debug(f"`{self.__class__.__name__}` instantiated.")
[docs] def add_layers(self, *layers): """ Add layers to the sequential model. Args: layers: torch.nn layers ``torch.nn`` layers that are used to build a sequential model. Available ones including: torch.nn.Linear, torch.nn.Dropout, and torch.nn.Sigmoid among others. See https://pytorch.org/docs/stable/nn.html for a full list. """ if self.layers is not None: raise NeuralNetworkError( "`add_layers()` called multiple times. It should be called only once." ) else: self.layers = [] for la in layers: self.layers.append(la) # set it as attr so that parameters are automatically registered setattr(self, "layer_{}".format(len(self.layers)), la) # check shape of first layer and last layer first = self.layers[0] if first.in_features != self.descriptor.get_size(): raise NeuralNetworkError( f"Expect `in_features` of first layer ({first.in_features}) be equal " f"to descriptor size ({self.descriptor.get_size()})." ) last = self.layers[-1] if last.out_features != 1: raise NeuralNetworkError("`out_features` of last layer should be 1.") # cast types self.type(self.dtype)
[docs] def forward(self, x): """ Forward pass through the neural network. Args: x: input descriptor to the neural network. Returns: The output of the neural network. """ for layer in self.layers: x = layer(x) return x
[docs] def write_kim_model( self, path: Optional[Path] = None, driver_name: str = "DUNN__MD_292677547454_000", dropout_ensemble_size: int = None, ): """ Write out a model that is compatible with the KIM API. Args: path: Path to write the model. If `None`, defaults to `./NeuralNetwork_KLIFF__MO_000000111111_000`. driver_name: Name of the model driver. dropout_ensemble_size: Size of the dropout ensemble. Ignored if not fitting a dropout NN. Otherwise, defaults to 100 if `None`. """ if path is None: model_name = "NeuralNetwork_KLIFF__MO_000000111111_000" path = Path.cwd().joinpath(model_name) else: path = Path(path).expanduser().resolve() model_name = str(path.name) if not path.exists(): os.makedirs(path) desc_name = "descriptor.params" nn_name = "NN.params" dropout_name = "dropout_binary.params" param_files = [desc_name, nn_name, dropout_name] self._write_kim_cmakelists( path, model_name, driver_name, param_files, version="2.0.0" ) self._write_kim_params(path, nn_name) self.descriptor.write_kim_params(path, desc_name) self._write_kim_dropout_binary(path, dropout_name, dropout_ensemble_size) logger.info(f"KLIFF trained model written to {path}.")
def _write_kim_params(self, path, filename="NN.params"): weights, biases = self._get_weights_and_biases() activations = self._get_activations() drop_ratios = self._get_drop_ratios() # PyTorch uses x*W^T + b, so we need to transpose it. # see https://pytorch.org/docs/stable/nn.html#linear weights = [torch.t(w) for w in weights] with open(path.joinpath(filename), "w") as fout: # header fout.write("#" + "=" * 80 + "\n") fout.write( "# NN structure and parameters file generated by KLIFF\n" "# \n" '# Note that the NN assumes each row of the input "X" is an \n' "# observation, i.e. the layer is implemented as\n" "# Y = activation(XW + b).\n" '# You need to transpose your weight matrix if each column of "X" is \n' "# an observation.\n" ) fout.write("#" + "=" * 80 + "\n\n") # number of layers num_layers = len(weights) fout.write( "{} # number of layers (excluding input layer,including output " "layer)\n".format(num_layers) ) # size of layers for b in biases: fout.write("{} ".format(len(b))) fout.write(" # size of each layer (last must be 1)\n") # activation function activation = activations[0] fout.write("{} # activation function\n".format(activation)) # keep probability for i in drop_ratios: fout.write("{:.15g} ".format(1.0 - i)) fout.write(" # keep probability of input for each layer\n\n") # weights and biases for i, (w, b) in enumerate(zip(weights, biases)): # weight rows, cols = w.shape if i != num_layers - 1: fout.write( "# weight of hidden layer {}, shape({}, {})\n".format( i + 1, rows, cols ) ) else: fout.write( "# weight of output layer, shape({}, {})\n".format(rows, cols) ) for line in w: for item in line: if self.dtype == torch.float64: fout.write("{:23.15e}".format(item)) else: fout.write("{:15.7e}".format(item)) fout.write("\n") # bias if i != num_layers - 1: fout.write( "# bias of hidden layer {}, shape({}, )\n".format(i + 1, cols) ) else: fout.write("# bias of output layer, shape({}, )\n".format(cols)) for item in b: if self.dtype == torch.float64: fout.write("{:23.15e}".format(item)) else: fout.write("{:15.7e}".format(item)) fout.write("\n\n") def _write_kim_dropout_binary( self, path, filename="dropout_binary.params", size=None ): drop_ratios = self._get_drop_ratios() keep_prob = [1.0 - i for i in drop_ratios] _, biases = self._get_weights_and_biases() num_units = [self.descriptor.get_size()] + [len(i) for i in biases] no_drop = np.all(np.asarray(drop_ratios) < 1e-10) if no_drop: size = 0 else: if size is None: size = 100 with open(path.joinpath(filename), "w") as fout: fout.write("#" + "=" * 80 + "\n") fout.write( "# Dropout binary parameters file generated by KLIFF.\n" "#\n" '# Note, "ensemble size = 0", means that no dropout needs to be\n' "# applied at all." ) fout.write("#" + "=" * 80 + "\n\n") fout.write("{} # ensemble size\n".format(size)) for rep in range(size): fout.write("#" + "=" * 80 + "\n") fout.write("# instance {}\n".format(rep)) for i in range(len(keep_prob)): fout.write("# layer {}\n".format(i)) n = num_units[i] k = keep_prob[i] rnd = np.floor(np.random.uniform(k, k + 1, n)) rnd = np.asarray(rnd, dtype=np.intc) for d in rnd: d = 1 if d > 1 else d d = 0 if d < 0 else d fout.write("{} ".format(d)) fout.write("\n") @staticmethod def _write_kim_cmakelists( path: Path, model_name: str, driver_name: str, param_files: List[str], version ): with open(path.joinpath("CMakeLists.txt"), "w") as fout: fout.write("#\n") fout.write("# Contributors:\n") fout.write("# KLIFF (https://kliff.readthedocs.io)\n") fout.write("#\n\n") fout.write("cmake_minimum_required(VERSION 3.4)\n\n") fout.write( "list(APPEND CMAKE_PREFIX_PATH $ENV{KIM_API_CMAKE_PREFIX_DIR})\n" ) fout.write("find_package(KIM-API 2.0 REQUIRED CONFIG)\n") fout.write("if(NOT TARGET kim-api)\n") fout.write(" enable_testing()\n") fout.write( ' project("${KIM_API_PROJECT_NAME}" VERSION "${KIM_API_VERSION}"\n' ) fout.write(" LANGUAGES CXX C Fortran)\n") fout.write("endif()\n\n") fout.write("add_kim_api_model_library(\n") fout.write(f' NAME "{model_name}"\n') fout.write(f' DRIVER_NAME "{driver_name}"\n') fout.write(" PARAMETER_FILES") for s in param_files: fout.write(' "{}"'.format(s)) fout.write("\n") fout.write(" )\n") def _group_layers( self, param_layer=("Linear",), activ_layer=("Sigmoid", "Tanh", "ReLU", "ELU"), dropout_layer=("Dropout",), ): """ Divide all the layers into groups. The first group is either an empty list or a `Dropout` layer for the input layer. The last group typically contains only a `Linear` layer. For other groups, each group contains two, or three layers. `Linear` layer and an activation layer are mandatory, and a third `Dropout` layer is optional. Returns: groups: list of list of layers """ groups = [] new_group = [] supported = param_layer + activ_layer + dropout_layer for i, layer in enumerate(self.layers): name = layer.__class__.__name__ if name not in supported: raise NeuralNetworkError( f"Layer `{name}` not supported by KIM model. Cannot proceed " "to write." ) if name in activ_layer: if i == 0: raise NeuralNetworkError(f"First layer cannot be a `{name}` layer") if self.layers[i - 1].__class__.__name__ not in param_layer: raise NeuralNetworkError( f"Cannot convert to KIM model. a `{name}` layer must follow " 'a "Linear" layer.' ) if name[:7] in dropout_layer: if self.layers[i - 1].__class__.__name__ not in activ_layer: raise NeuralNetworkError( f"Cannot convert to KIM model. a `{name}` layer must follow " "an activation layer." ) if name in param_layer: groups.append(new_group) new_group = [] new_group.append(layer) groups.append(new_group) return groups, param_layer, activ_layer, dropout_layer def _get_weights_and_biases(self): """ Get weights and biases of all layers that have weights and biases. """ groups, supported, _, _ = self._group_layers() weights = [] biases = [] for i, g in enumerate(groups): if i != 0: layer = g[0] name = layer.__class__.__name__ if name in supported: weight = layer.weight bias = layer.bias weights.append(weight) biases.append(bias) return weights, biases def _get_activations(self): """ Get the activation of all layers. """ groups, _, supported, _ = self._group_layers() activations = [] for i, g in enumerate(groups): if i != 0 and i != (len(groups) - 1): layer = g[1] name = layer.__class__.__name__ if name in supported: activations.append(name.lower()) return activations def _get_drop_ratios(self): """ Get the dropout ratio of all layers. """ groups, _, _, supported = self._group_layers() drop_ratios = [] for i, g in enumerate(groups): if i == 0: if len(g) != 0: layer = g[0] name = layer.__class__.__name__ if name in supported: drop_ratios.append(layer.p) else: drop_ratios.append(0.0) elif i == len(groups) - 1: pass else: if len(g) == 3: layer = g[2] name = layer.__class__.__name__ if name in supported: drop_ratios.append(layer.p) else: drop_ratios.append(0.0) return drop_ratios
[docs] class NeuralNetworkError(Exception): def __init__(self, msg): super(NeuralNetworkError, self).__init__(msg) self.msg = msg