Source code for kliff.uq.mcmc

import warnings
from typing import Callable, List, Optional

import numpy as np

from kliff.error import report_import_error
from kliff.loss import Loss

try:
    import emcee

    emcee_avail = True
except ImportError:
    emcee_avail = False

try:
    import ptemcee

    ptemcee_avail = True
except ImportError:
    ptemcee_avail = False


def logprior_uniform(x: np.ndarray, bounds: np.ndarray) -> float:
    """Logarithm of the non-normalized joint uniform prior. This is the default prior
    distribution.

    Parameters
    ----------
    x : np.ndarray
        Parameter values to evaluate.
    bounds : np.ndarray (nparams, 2,)
        An array containing the boundaries of the uniform prior. The first column of the
        array contains the lower bounds and the second column contains the upper bounds.

    Returns
    -------
    float:
        Logarithm of the non-normalized joint uniform prior evaluated at parameter ``x``.
    """
    l_bounds, u_bounds = bounds.T
    if all(np.less(x, u_bounds)) and all(np.greater(x, l_bounds)):
        ret = 0.0
    else:
        ret = -np.inf

    return ret


[docs]def get_T0(loss: Loss): """Compute the natural temperature. The minimum loss is the loss value at the optimal parameters. Parameters ---------- loss : Loss Loss function class from :class:`~kliff.loss.Loss`. Returns ------- float Value of the natural temperature. """ xopt = loss.calculator.get_opt_params() ndim = len(xopt) C0 = loss._get_loss(xopt) return 2 * C0 / ndim
[docs]class MCMC: """MCMC sampler class for Bayesian uncertainty quantification. This is a wrapper over :class:`PtemceeSampler` and :class:`EmceeSampler`. Currently, only these 2 samplers implemented. Parameters ---------- loss : Loss Loss function class from :class:`~kliff.loss.Loss`. nwalkers : Optional[int] Number of walkers to simulate. The minimum number of walkers is twice the number of parameters. It defaults to this minimum value. logprior_fn : Optional[Callable] A function that evaluate logarithm of the prior distribution. The prior doesn't need to be normalized. It defaults to a uniform prior over a finite range. logprior_args : Optional[tuple] Additional positional arguments of the ``logprior_fn``. If the default ``logprior_fn`` is used, then the boundaries of the uniform prior can be specified here. sampler : Optional[str] or sampler instance An argument that specifies the MCMC sampler to use. The value can be one of the strings ``"ptemcee"`` (the default value) or ``"emcee"``, or a sampler class instance. If ``"ptemcee"`` or ``"emcee"`` is given, a respective internal sampler class will be uses. **kwargs : Optional[dict] Additional keyword arguments for ``ptemcee.Sampler`` or ``emcee.EnsembleSampler``. """ builtin_samplers = ["ptemcee", "emcee"] def __new__( self, loss: Loss, nwalkers: Optional[int] = None, logprior_fn: Optional[Callable] = None, logprior_args: Optional[tuple] = None, sampler: Optional[str] = "ptemcee", **kwargs, ): if sampler == "ptemcee": if ptemcee_avail: # Force pool to be declared after MCMC is instantiated if "pool" in kwargs: raise ValueError( "Please declare the pool after instantiating ``kliff.uq.MCMC``" ) # Pre-process keyword arguments ntemps = kwargs.pop("ntemps") if "ntemps" in kwargs else 10 Tmax_ratio = kwargs.pop("Tmax_ratio") if "Tmax_ratio" in kwargs else 1.0 Tladder = kwargs.pop("Tladder") if "Tladder" in kwargs else None return PtemceeSampler( loss, nwalkers, ntemps, Tmax_ratio, Tladder, logprior_fn, logprior_args, **kwargs, ) else: report_import_error("ptemcee") elif sampler == "emcee": if emcee_avail: # Force pool to be declared after MCMC is instantiated if "pool" in kwargs: raise ValueError( "Please declare the pool after instantiating ``kliff.uq.MCMC``" ) # Pre-process keyword arguments T = kwargs.pop("T") if "T" in kwargs else None return EmceeSampler( loss, nwalkers, T, logprior_fn, logprior_args, **kwargs ) else: report_import_error("emcee") elif isinstance(sampler, str) and sampler not in self.builtin_samplers: raise ValueError( "Unknown sampler. Please input 'ptemcee', 'emcee' or " "some sampler class instance" ) else: warnings.warn("The other arguments are ignored") return sampler
if ptemcee_avail: class PtemceeSampler(ptemcee.Sampler): """Sampler class for PTMCMC via ``ptemcee`` Python package. Parameters ---------- loss : Loss Loss function instance from :class:`~kliff.loss.Loss`. nwalkers : Optional[int] Number of walkers to simulate. The minimum number of walkers is twice the number of parameters. It defaults to this minimum value. ntemps: Optional[int] Number of temperatures to simulate. It defaults to 10. Tmax_ratio: Optional[float] The ratio between the highest temperature to use and the natural temperature. Higher value means that the maximum temperature is higher than :math:`T_0` [Frederiksen2004]_. It defaults to 1.0. Tladder: Optional[List] A list containing the temperature values to use. The values nedd to be monotonically increasing or decreasing. It defaults to ``None``, which will be generated from ``ntemps`` and ``Tmax_ratio``. logprior_fn : Optional[Callable] A function that evaluate logarithm of the prior distribution. The prior doesn't need to be normalized. It defaults to a uniform prior over a finite range. logprior_args : Optional[tuple] Additional positional arguments of the ``logprior_fn``. If the default ``logprior_fn`` is used, then the boundaries of the uniform prior can be specified here. **kwargs : Optional[dict] Additional keyword arguments for ``ptemcee.Sampler``. Attributes ---------- loss: Loss Loss function instance from :class:`~kliff.loss.Loss` T0: float Values of the natural temperature, :math:`T_0` [Frederiksen2004]_. Tladder: np.ndarray An array containing the values of the sampling temperatures. sampler: ptemcee.Sampler Sampler instance from ``ptemcee.Sampler`` """ def __init__( self, loss: Loss, nwalkers: Optional[int] = None, ntemps: Optional[int] = 10, Tmax_ratio: Optional[float] = 1.0, Tladder: Optional[List] = None, logprior_fn: Optional[Callable] = None, logprior_args: Optional[tuple] = None, **kwargs, ): self.loss = loss # Dimensionality ndim = loss.calculator.get_num_opt_params() nwalkers = 2 * ndim if nwalkers is None else nwalkers # Probability global loglikelihood_fn def loglikelihood_fn(x): return _get_loglikelihood(x, loss) if logprior_fn is None: logprior_fn = logprior_uniform if logprior_args is None: logprior_args = (_get_parameter_bounds(loss),) # Sampling temperatures self.T0 = None self.Tladder = self._generate_temp_ladder(ntemps, Tmax_ratio, Tladder) betas = 1 / self.Tladder super().__init__( nwalkers, ndim, loglikelihood_fn, logprior_fn, logpargs=logprior_args, betas=betas, **kwargs, ) def _generate_temp_ladder(self, ntemps, Tmax_ratio, Tladder): """Generate temperature ladder""" # Only generate temperature ladder when it is not specified. if Tladder is None: # Compute T0 self.T0 = get_T0(self.loss) Tmax = Tmax_ratio * self.T0 Tmax_not_T0 = Tmax_ratio != 1.0 # If Tmax is not T0, then we need to generate 1 less temperature than # requested, and append T0 afterward. if Tmax_not_T0: ntemps -= 1 Tladder = np.logspace(0, np.log10(Tmax), ntemps) if Tmax_not_T0: Tladder = np.sort(np.append(Tladder, self.T0)) return Tladder if emcee_avail: class EmceeSampler(emcee.EnsembleSampler): """Sampler class for affine invariant MCMC via ``emcee`` Python package. Parameters ---------- loss : Loss Loss function instance from :class:`~kliff.loss.Loss`. nwalkers : Optional[int] Number of walkers to simulate. The minimum number of walkers is twice the number of parameters. It defaults to this minimum value. T: Optional[float] Sampling temperatures, used to inflate the likelihood function in the MCMC sampling. It defaults to the natural temperature :math:`T_0` [Frederiksen2004]_. logprior_fn : Optional[Callable] A function that evaluate logarithm of the prior distribution. The prior doesn't need to be normalized. It defaults to a uniform prior over a finite range. logprior_args : Optional[tuple] Additional positional arguments of the ``logprior_fn``. If the default ``logprior_fn`` is used, then the boundaries of the uniform prior can be specified here. **kwargs : Optional[dict] Additional keyword arguments for ``emcee.EnsembleSampler``. Attributes ---------- loss: Loss Loss function instance from :class:`~kliff.loss.Loss` T: float Values of the sampling temperature. sampler: emcee.EnsembleSampler Sampler instance from ``emcee.EnsembleSampler`` Notes ----- As a convention, KLIFF inflates the likelihood by some sampling temperature, i.e., :math:`L(\\theta) \propto \exp(-C(\\theta) / T)`. As a default, the sampling temperature is set to the natural temperature. To use the untempered likelihood (:math:`T=1`), user should specify the argument ``T=1``. References ---------- .. [Frederiksen2004] S. L. Frederiksen, K. W. Jacobsen, K. S. Brown, and J. P. Sethna, “Bayesian Ensemble Approach to Error Estimation of Interatomic Potentials,” Phys. Rev. Lett., vol. 93, no. 16, p. 165501, Oct. 2004, doi: 10.1103/PhysRevLett.93.165501. """ def __init__( self, loss: Loss, nwalkers: Optional[int] = None, T: Optional[float] = None, logprior_fn: Optional[Callable] = None, logprior_args: Optional[tuple] = None, **kwargs, ): self.loss = loss # Dimensionality ndim = loss.calculator.get_num_opt_params() nwalkers = 2 * ndim if nwalkers is None else nwalkers # Probability if T is None: self.T = get_T0(self.loss) else: self.T = T logl_fn = self._loglikelihood_wrapper logp_fn = self._logprior_wrapper(logprior_fn, *logprior_args) global logprobability_fn def logprobability_fn(x): return logl_fn(x) + logp_fn(x) super().__init__(nwalkers, ndim, logprobability_fn, **kwargs) def _loglikelihood_wrapper(self, x): """A wrapper to the log-likelihood function, so that the only argument is the parameter values. """ return _get_loglikelihood(x, self.loss, self.T) def _logprior_wrapper(self, logprior_fn, *logprior_args): """A wapper to the log-prior function, so that the only argument is the parameter values. """ if logprior_fn is None: if logprior_args is None: logprior_args = (_get_parameter_bounds(self.loss),) logprior_fn = logprior_uniform def logp(x): return logprior_fn(x, *logprior_args) return logp def _get_loglikelihood(x: np.ndarray, loss: Loss, T: Optional[float] = 1.0): """Compute the log-likelihood from the cost function. It has an option to temper the cost by specifying ``T``. """ cost = loss._get_loss(x) logl = -cost / T return logl def _get_parameter_bounds(loss): """Get the parameter bounds for the default uniform prior.""" bounds = loss.calculator.get_opt_params_bounds() if np.any(np.isin(bounds, None)): raise MCMCError( "Improper uniform prior bound is used. Please specify a finite range." ) return np.array(bounds) class MCMCError(Exception): def __init__(self, msg): super(MCMCError, self).__init__(msg) self.msg = msg