import warnings
from typing import Callable, List, Optional
import numpy as np
from kliff.error import report_import_error
from kliff.legacy.loss import Loss
try:
import emcee
emcee_avail = True
except ImportError:
emcee_avail = False
try:
import ptemcee
ptemcee_avail = True
except ImportError:
ptemcee_avail = False
[docs]
def get_T0(loss: Loss) -> float:
"""Compute the natural temperature.
The minimum loss is the loss value at the optimal parameters.
Args:
loss: Loss function class from :class:`~kliff.legacy.loss.Loss`.
Returns:
Value of the natural temperature.
"""
xopt = loss.calculator.get_opt_params()
ndim = len(xopt)
C0 = loss._get_loss(xopt)
return 2 * C0 / ndim
[docs]
class MCMC:
"""MCMC sampler class for Bayesian uncertainty quantification.
This is a wrapper over :class:`PtemceeSampler` and
:class:`EmceeSampler`. Currently, only these 2 samplers implemented.
Args:
loss: Loss function class from :class:`~kliff.legacy.loss.Loss`.
nwalkers: Number of walkers to simulate. The minimum number of walkers is twice
the number of parameters. It defaults to this minimum value.
logprior_fn: A function that evaluate logarithm of the prior distribution. The
prior doesn't need to be normalized. It defaults to a uniform prior over a
finite range.
logprior_args: Additional positional arguments of the ``logprior_fn``. If the
default ``logprior_fn`` is used, then the boundaries of the uniform prior can
be specified here.
sampler: An argument that specifies the MCMC sampler to use. The value can be one
of the strings ``"ptemcee"`` (the default value) or ``"emcee"``, or a sampler
class instance. If ``"ptemcee"`` or ``"emcee"`` is given, a respective
internal sampler class will be uses.
**kwargs: Additional keyword arguments for ``ptemcee.Sampler`` or ``emcee.EnsembleSampler``.
"""
builtin_samplers = ["ptemcee", "emcee"]
def __new__(
self,
loss: Loss,
nwalkers: Optional[int] = None,
logprior_fn: Optional[Callable] = None,
logprior_args: Optional[tuple] = None,
sampler: Optional[str] = "ptemcee",
**kwargs,
):
if sampler == "ptemcee":
if ptemcee_avail:
# Force pool to be declared after MCMC is instantiated
if "pool" in kwargs:
raise ValueError(
"Please declare the pool after instantiating ``kliff.uq.MCMC``"
)
# Pre-process keyword arguments
ntemps = kwargs.pop("ntemps") if "ntemps" in kwargs else 10
Tmax_ratio = kwargs.pop("Tmax_ratio") if "Tmax_ratio" in kwargs else 1.0
Tladder = kwargs.pop("Tladder") if "Tladder" in kwargs else None
return PtemceeSampler(
loss,
nwalkers,
ntemps,
Tmax_ratio,
Tladder,
logprior_fn,
logprior_args,
**kwargs,
)
else:
report_import_error("ptemcee")
elif sampler == "emcee":
if emcee_avail:
# Force pool to be declared after MCMC is instantiated
if "pool" in kwargs:
raise ValueError(
"Please declare the pool after instantiating ``kliff.uq.MCMC``"
)
# Pre-process keyword arguments
T = kwargs.pop("T") if "T" in kwargs else None
return EmceeSampler(
loss, nwalkers, T, logprior_fn, logprior_args, **kwargs
)
else:
report_import_error("emcee")
elif isinstance(sampler, str) and sampler not in self.builtin_samplers:
raise ValueError(
"Unknown sampler. Please input 'ptemcee', 'emcee' or "
"some sampler class instance"
)
else:
warnings.warn("The other arguments are ignored")
return sampler
if ptemcee_avail:
class PtemceeSampler(ptemcee.Sampler):
"""Sampler class for PTMCMC via ``ptemcee`` Python package.
Args:
loss: Loss function instance from :class:`~kliff.legacy.loss.Loss`.
nwalkers: Number of walkers to simulate. The minimum number of walkers is
twice the number of parameters. It defaults to this minimum value.
ntemps: Number of temperatures to simulate. It defaults to 10.
Tmax_ratio: The ratio between the highest temperature to use and the natural
temperature. Higher value means that the maximum temperature is higher
than :math:`T_0` [Frederiksen2004]_. It defaults to 1.0.
Tladder: A list containing the temperature values to use. The values nedd to
be monotonically increasing or decreasing. It defaults to ``None``, which
will be generated from ``ntemps`` and ``Tmax_ratio``.
logprior_fn: A function that evaluate logarithm of the prior distribution.
The prior doesn't need to be normalized. It defaults to a uniform prior
over a finite range.
logprior_args: Additional positional arguments of the ``logprior_fn``. If the
default ``logprior_fn`` is used, then the boundaries of the uniform prior
can be specified here.
**kwargs: Additional keyword arguments for ``ptemcee.Sampler``.
Attributes:
loss: Loss function instance from :class:`~kliff.legacy.loss.Loss`
T0: Values of the natural temperature, :math:`T_0` [Frederiksen2004]_.
Tladder: An array containing the values of the sampling temperatures.
sampler: Sampler instance from ``ptemcee.Sampler``
"""
def __init__(
self,
loss: Loss,
nwalkers: Optional[int] = None,
ntemps: Optional[int] = 10,
Tmax_ratio: Optional[float] = 1.0,
Tladder: Optional[List] = None,
logprior_fn: Optional[Callable] = None,
logprior_args: Optional[tuple] = None,
**kwargs,
):
self.loss = loss
# Dimensionality
ndim = loss.calculator.get_num_opt_params()
nwalkers = 2 * ndim if nwalkers is None else nwalkers
# Probability
global loglikelihood_fn
def loglikelihood_fn(x):
return _get_loglikelihood(x, loss)
if logprior_fn is None:
logprior_fn = logprior_uniform
if logprior_args is None:
logprior_args = (_get_parameter_bounds(loss),)
# Sampling temperatures
self.T0 = None
self.Tladder = self._generate_temp_ladder(ntemps, Tmax_ratio, Tladder)
betas = 1 / self.Tladder
super().__init__(
nwalkers,
ndim,
loglikelihood_fn,
logprior_fn,
logpargs=logprior_args,
betas=betas,
**kwargs,
)
def _generate_temp_ladder(self, ntemps, Tmax_ratio, Tladder):
"""Generate temperature ladder"""
# Only generate temperature ladder when it is not specified.
if Tladder is None:
# Compute T0
self.T0 = get_T0(self.loss)
Tmax = Tmax_ratio * self.T0
Tmax_not_T0 = Tmax_ratio != 1.0
# If Tmax is not T0, then we need to generate 1 less temperature than
# requested, and append T0 afterward.
if Tmax_not_T0:
ntemps -= 1
Tladder = np.logspace(0, np.log10(Tmax), ntemps)
if Tmax_not_T0:
Tladder = np.sort(np.append(Tladder, self.T0))
return Tladder
if emcee_avail:
class EmceeSampler(emcee.EnsembleSampler):
"""Sampler class for affine invariant MCMC via ``emcee`` Python package.
Args:
loss: Loss function instance from :class:`~kliff.legacy.loss.Loss`.
nwalkers: Number of walkers to simulate. The minimum number of walkers is
twice the number of parameters. It defaults to this minimum value.
T: Sampling temperatures, used to inflate the likelihood function in the MCMC
sampling. It defaults to the natural temperature :math:`T_0` [Frederiksen2004]_.
logprior_fn: A function that evaluate logarithm of the prior distribution.
The prior doesn't need to be normalized. It defaults to a uniform prior
over a finite range.
logprior_args: Additional positional arguments of the ``logprior_fn``. If the
default ``logprior_fn`` is used, then the boundaries of the uniform prior
can be specified here.
**kwargs: Additional keyword arguments for ``emcee.EnsembleSampler``.
Attributes:
loss: Loss function instance from :class:`~kliff.legacy.loss.Loss`
T: Values of the sampling temperature.
sampler: Sampler instance from ``emcee.EnsembleSampler``
Notes:
As a convention, KLIFF inflates the likelihood by some sampling temperature,
i.e., :math:`L(\\theta) \propto \exp(-C(\\theta) / T)`. As a default, the
sampling temperature is set to the natural temperature. To use the untempered
likelihood (:math:`T=1`), user should specify the argument ``T=1``.
References:
.. [Frederiksen2004] S. L. Frederiksen, K. W. Jacobsen, K. S. Brown, and J. P.
Sethna, “Bayesian Ensemble Approach to Error Estimation of Interatomic
Potentials,” Phys. Rev. Lett., vol. 93, no. 16, p. 165501, Oct. 2004,
doi: 10.1103/PhysRevLett.93.165501.
"""
def __init__(
self,
loss: Loss,
nwalkers: Optional[int] = None,
T: Optional[float] = None,
logprior_fn: Optional[Callable] = None,
logprior_args: Optional[tuple] = None,
**kwargs,
):
self.loss = loss
# Dimensionality
ndim = loss.calculator.get_num_opt_params()
nwalkers = 2 * ndim if nwalkers is None else nwalkers
# Probability
if T is None:
self.T = get_T0(self.loss)
else:
self.T = T
logl_fn = self._loglikelihood_wrapper
logp_fn = self._logprior_wrapper(logprior_fn, *logprior_args)
global logprobability_fn
def logprobability_fn(x):
return logl_fn(x) + logp_fn(x)
super().__init__(nwalkers, ndim, logprobability_fn, **kwargs)
def _loglikelihood_wrapper(self, x):
"""A wrapper to the log-likelihood function, so that the only argument is the
parameter values.
"""
return _get_loglikelihood(x, self.loss, self.T)
def _logprior_wrapper(self, logprior_fn, *logprior_args):
"""A wapper to the log-prior function, so that the only argument is the
parameter values.
"""
if logprior_fn is None:
if logprior_args is None:
logprior_args = (_get_parameter_bounds(self.loss),)
logprior_fn = logprior_uniform
def logp(x):
return logprior_fn(x, *logprior_args)
return logp
def _get_loglikelihood(x: np.ndarray, loss: Loss, T: Optional[float] = 1.0):
"""Compute the log-likelihood from the cost function. It has an option to temper the
cost by specifying ``T``.
"""
cost = loss._get_loss(x)
logl = -cost / T
return logl
def _get_parameter_bounds(loss):
"""Get the parameter bounds for the default uniform prior."""
bounds = loss.calculator.get_opt_params_bounds()
if np.any(np.isin(bounds, None)):
raise MCMCError(
"Improper uniform prior bound is used. Please specify a finite range."
)
return np.array(bounds)
[docs]
class MCMCError(Exception):
def __init__(self, msg):
super(MCMCError, self).__init__(msg)
self.msg = msg