diff --git a/doc/example/distributions.ipynb b/doc/example/distributions.ipynb index 5c285a0a..6442a548 100644 --- a/doc/example/distributions.ipynb +++ b/doc/example/distributions.ipynb @@ -32,34 +32,34 @@ "import seaborn as sns\n", "\n", "from petab.v1.C import *\n", - "from petab.v1.distributions import *\n", + "from petab.v1.priors import Prior\n", "\n", "sns.set_style(None)\n", "\n", "\n", - "def plot(distr: Distribution, ax=None):\n", + "def plot(prior: Prior, ax=None):\n", " \"\"\"Visualize a distribution.\"\"\"\n", " if ax is None:\n", " fig, ax = plt.subplots()\n", "\n", - " sample = distr.sample(10000)\n", + " sample = prior.sample(10000)\n", "\n", " # pdf\n", - " xmin = min(sample.min(), distr.lb_scaled if distr.bounds is not None else sample.min())\n", - " xmax = max(sample.max(), distr.ub_scaled if distr.bounds is not None else sample.max())\n", + " xmin = min(sample.min(), prior.lb_scaled if prior.bounds is not None else sample.min())\n", + " xmax = max(sample.max(), prior.ub_scaled if prior.bounds is not None else sample.max())\n", " x = np.linspace(xmin, xmax, 500)\n", - " y = distr.pdf(x)\n", + " y = prior.pdf(x)\n", " ax.plot(x, y, color='red', label='pdf')\n", "\n", " sns.histplot(sample, stat='density', ax=ax, label=\"sample\")\n", "\n", " # bounds\n", - " if distr.bounds is not None:\n", - " for bound in (distr.lb_scaled, distr.ub_scaled):\n", + " if prior.bounds is not None:\n", + " for bound in (prior.lb_scaled, prior.ub_scaled):\n", " if bound is not None and np.isfinite(bound):\n", " ax.axvline(bound, color='black', linestyle='--', label='bound')\n", "\n", - " ax.set_title(str(distr))\n", + " ax.set_title(str(prior))\n", " ax.set_xlabel('Parameter value on the parameter scale')\n", " ax.grid(False)\n", " handles, labels = ax.get_legend_handles_labels()\n", @@ -81,11 +81,11 @@ "metadata": {}, "cell_type": "code", "source": [ - "plot(Uniform(0, 1))\n", - "plot(Normal(0, 1))\n", - "plot(Laplace(0, 1))\n", - "plot(LogNormal(0, 1))\n", - "plot(LogLaplace(1, 0.5))" + "plot(Prior(UNIFORM, (0, 1)))\n", + "plot(Prior(NORMAL, (0, 1)))\n", + "plot(Prior(LAPLACE, (0, 1)))\n", + "plot(Prior(LOG_NORMAL, (0, 1)))\n", + "plot(Prior(LOG_LAPLACE, (1, 0.5)))" ], "id": "4f09e50a3db06d9f", "outputs": [], @@ -101,10 +101,11 @@ "metadata": {}, "cell_type": "code", "source": [ - "plot(Normal(10, 2, transformation=LIN))\n", - "plot(Normal(10, 2, transformation=LOG))\n", + "plot(Prior(NORMAL, (10, 2), transformation=LIN))\n", + "plot(Prior(NORMAL, (10, 2), transformation=LOG))\n", + "\n", "# Note that the log-normal distribution is different from a log-transformed normal distribution:\n", - "plot(LogNormal(10, 2, transformation=LIN))" + "plot(Prior(LOG_NORMAL, (10, 2), transformation=LIN))" ], "id": "f6192c226f179ef9", "outputs": [], @@ -120,8 +121,8 @@ "metadata": {}, "cell_type": "code", "source": [ - "plot(LogNormal(10, 2, transformation=LOG))\n", - "plot(ParameterScaleNormal(10, 2))" + "plot(Prior(LOG_NORMAL, (10, 2), transformation=LOG))\n", + "plot(Prior(PARAMETER_SCALE_NORMAL, (10, 2)))" ], "id": "34c95268e8921070", "outputs": [], @@ -137,11 +138,11 @@ "metadata": {}, "cell_type": "code", "source": [ - "plot(Uniform(0, 1, transformation=LOG10))\n", - "plot(ParameterScaleUniform(0, 1, transformation=LOG10))\n", + "plot(Prior(UNIFORM, (0.01, 2), transformation=LOG10))\n", + "plot(Prior(PARAMETER_SCALE_UNIFORM, (0.01, 2), transformation=LOG10))\n", "\n", - "plot(Uniform(0, 1, transformation=LIN))\n", - "plot(ParameterScaleUniform(0, 1, transformation=LIN))\n" + "plot(Prior(UNIFORM, (0.01, 2), transformation=LIN))\n", + "plot(Prior(PARAMETER_SCALE_UNIFORM, (0.01, 2), transformation=LIN))\n" ], "id": "5ca940bc24312fc6", "outputs": [], @@ -157,8 +158,8 @@ "metadata": {}, "cell_type": "code", "source": [ - "plot(Normal(0, 1, bounds=(-4, 4))) # negligible clipping-bias at 4 sigma\n", - "plot(Uniform(0, 1, bounds=(0.1, 0.9))) # significant clipping-bias" + "plot(Prior(NORMAL, (0, 1), bounds=(-4, 4))) # negligible clipping-bias at 4 sigma\n", + "plot(Prior(UNIFORM, (0, 1), bounds=(0.1, 0.9))) # significant clipping-bias" ], "id": "4ac42b1eed759bdd", "outputs": [], @@ -174,8 +175,10 @@ "metadata": {}, "cell_type": "code", "source": [ - "plot(Normal(10, 1, bounds=(6, 14), transformation=\"log10\"))\n", - "plot(ParameterScaleNormal(10, 1, bounds=(10**6, 10**14), transformation=\"log10\"))\n" + "plot(Prior(NORMAL, (10, 1), bounds=(6, 14), transformation=\"log10\"))\n", + "plot(Prior(PARAMETER_SCALE_NORMAL, (10, 1), bounds=(10**6, 10**14), transformation=\"log10\"))\n", + "plot(Prior(LAPLACE, (10, 2), bounds=(6, 14)))\n", + "\n" ], "id": "581e1ac431860419", "outputs": [], @@ -185,7 +188,7 @@ "metadata": {}, "cell_type": "code", "source": "", - "id": "802a64be56a6c94f", + "id": "633733651bbc3ef0", "outputs": [], "execution_count": null } diff --git a/petab/v1/C.py b/petab/v1/C.py index a013a0cc..be044a5c 100644 --- a/petab/v1/C.py +++ b/petab/v1/C.py @@ -173,7 +173,8 @@ LOG10 = "log10" #: Supported observable transformations OBSERVABLE_TRANSFORMATIONS = [LIN, LOG, LOG10] - +#: Supported parameter transformations +PARAMETER_SCALES = [LIN, LOG, LOG10] # NOISE MODELS diff --git a/petab/v1/distributions.py b/petab/v1/distributions.py index eea34e96..be7bad89 100644 --- a/petab/v1/distributions.py +++ b/petab/v1/distributions.py @@ -2,82 +2,23 @@ from __future__ import annotations import abc -from typing import Literal import numpy as np -import pandas as pd -from scipy.stats import laplace, lognorm, norm, uniform - -from . import C -from .parameters import scale, unscale +from scipy.stats import laplace, lognorm, loguniform, norm, uniform __all__ = [ "Distribution", "Normal", "LogNormal", "Uniform", + "LogUniform", "Laplace", "LogLaplace", - "ParameterScaleNormal", - "ParameterScaleUniform", - "ParameterScaleLaplace", ] class Distribution(abc.ABC): - """A univariate probability distribution. - - :param type_: The type of the distribution. - :param transformation: The transformation to be applied to the sample. - Ignored if `parameter_scale` is `True`. - :param parameters: The parameters of the distribution (unaffected by - `parameter_scale` and `transformation`). - :param bounds: The untransformed bounds of the sample (lower, upper). - :param parameter_scale: Whether the parameters are already on the correct - scale. If `False`, the parameters are transformed to the correct scale. - If `True`, the parameters are assumed to be on the correct scale and - no transformation is applied. - :param transformation: The transformation of the distribution. - - """ - - #: Mapping from distribution type to distribution class for factory method. - _type_to_cls: dict[str, type[Distribution]] = {} - - def __init__( - self, - type_: str, - transformation: str, - parameters: tuple, - bounds: tuple = None, - parameter_scale: bool = False, - ): - if type_ not in self._type_to_cls: - raise ValueError(f"Unknown distribution type: {type_}") - - if len(parameters) != 2: - raise ValueError( - f"Expected two parameters, got {len(parameters)}: {parameters}" - ) - - if bounds is not None and len(bounds) != 2: - raise ValueError( - f"Expected two bounds, got {len(bounds)}: {bounds}" - ) - - self.type = type_ - self.parameters = parameters - self.bounds = bounds - self.transformation = transformation - self.parameter_scale = parameter_scale - - def __repr__(self): - return ( - f"{self.__class__.__name__}(" - f"{self.parameters[0]!r}, {self.parameters[1]!r}," - f" bounds={self.bounds!r}, transformation={self.transformation!r}" - ")" - ) + """A univariate probability distribution.""" @abc.abstractmethod def sample(self, shape=None) -> np.ndarray: @@ -88,115 +29,15 @@ def sample(self, shape=None) -> np.ndarray: """ ... - def _scale_sample(self, sample): - """Scale the sample to the parameter space""" - if self.parameter_scale: - return sample - - return scale(sample, self.transformation) - - def _clip_to_bounds(self, x): - """Clip `x` values to bounds. - - :param x: The values to clip. Assumed to be on the parameter scale. - """ - # TODO: replace this by proper truncation - if self.bounds is None: - return x - - return np.maximum( - np.minimum(self.ub_scaled, x), - self.lb_scaled, - ) - - @property - def lb_scaled(self): - """The lower bound on the parameter scale.""" - return scale(self.bounds[0], self.transformation) - - @property - def ub_scaled(self): - """The upper bound on the parameter scale.""" - return scale(self.bounds[1], self.transformation) - @abc.abstractmethod - def _pdf(self, x): + def pdf(self, x): """Probability density function at x. :param x: The value at which to evaluate the PDF. - ``x`` is assumed to be on the linear scale. :return: The value of the PDF at ``x``. """ ... - def pdf(self, x): - """Probability density function at x. - - :param x: The value at which to evaluate the PDF. - ``x`` is assumed to be on the parameter scale. - :return: The value of the PDF at ``x``. Note that the PDF does - currently not account for the clipping at the bounds. - """ - x = x if self.parameter_scale else unscale(x, self.transformation) - - # scale the PDF to the parameter scale - if self.parameter_scale or self.transformation == C.LIN: - coeff = 1 - elif self.transformation == C.LOG10: - coeff = x * np.log(10) - elif self.transformation == C.LOG: - coeff = x - else: - raise ValueError(f"Unknown transformation: {self.transformation}") - - return self._pdf(x) * coeff - - def neglogprior(self, x): - """Negative log-prior at x. - - :param x: The value at which to evaluate the negative log-prior. - ``x`` is assumed to be on the parameter scale. - :return: The negative log-prior at ``x``. - """ - return -np.log(self.pdf(x)) - - @staticmethod - def from_par_dict( - d, type_=Literal["initialization", "objective"] - ) -> Distribution: - """Create a distribution from a row of the parameter table. - - :param d: A dictionary representing a row of the parameter table. - :param type_: The type of the distribution. - :return: A distribution object. - """ - dist_type = d.get(f"{type_}PriorType", C.NORMAL) - if not isinstance(dist_type, str) and np.isnan(dist_type): - dist_type = C.PARAMETER_SCALE_UNIFORM - cls = Distribution._type_to_cls[dist_type] - - pscale = d.get(C.PARAMETER_SCALE, C.LIN) - if ( - pd.isna(d[f"{type_}PriorParameters"]) - and dist_type == C.PARAMETER_SCALE_UNIFORM - ): - params = ( - scale(d[C.LOWER_BOUND], pscale), - scale(d[C.UPPER_BOUND], pscale), - ) - else: - params = tuple( - map( - float, - d[f"{type_}PriorParameters"].split(C.PARAMETER_SEPARATOR), - ) - ) - return cls( - *params, - bounds=(d[C.LOWER_BOUND], d[C.UPPER_BOUND]), - transformation=pscale, - ) - class Normal(Distribution): """A normal distribution.""" @@ -205,57 +46,69 @@ def __init__( self, mean: float, std: float, - bounds: tuple = None, - transformation: str = C.LIN, - _type=C.NORMAL, - _parameter_scale=False, + truncation: tuple[float, float] | None = None, ): - super().__init__( - transformation=transformation, - parameters=(mean, std), - bounds=bounds, - parameter_scale=_parameter_scale, - type_=_type, + super().__init__() + self._mean = mean + self._std = std + self._truncation = truncation + + if truncation is not None: + raise NotImplementedError("Truncation is not yet implemented.") + + def __repr__(self): + return ( + f"Normal(mean={self._mean}, std={self._std}, " + f"truncation={self._truncation})" ) def sample(self, shape=None): - sample = np.random.normal( - loc=self.parameters[0], scale=self.parameters[1], size=shape - ) - return self._clip_to_bounds(self._scale_sample(sample)) + return np.random.normal(loc=self._mean, scale=self._std, size=shape) - def _pdf(self, x): - return norm.pdf(x, loc=self.parameters[0], scale=self.parameters[1]) + def pdf(self, x): + return norm.pdf(x, loc=self._mean, scale=self._std) class LogNormal(Distribution): - """A log-normal distribution.""" + """A log-normal distribution. + + :param mean: The mean of the underlying normal distribution. + :param std: The standard deviation of the underlying normal distribution. + + """ def __init__( self, mean: float, std: float, - bounds: tuple = None, - transformation: str = C.LIN, + truncation: tuple[float, float] | None = None, + base: float = np.exp(1), ): - super().__init__( - C.LOG_NORMAL, - transformation=transformation, - parameters=(mean, std), - bounds=bounds, - parameter_scale=False, + super().__init__() + self._mean = mean + self._std = std + self._truncation = truncation + self._base = base + + if truncation is not None: + raise NotImplementedError("Truncation is not yet implemented.") + + if base != np.exp(1): + raise NotImplementedError("Only base e is supported.") + + def __repr__(self): + return ( + f"LogNormal(mean={self._mean}, std={self._std}, " + f"base={self._base}, truncation={self._truncation})" ) def sample(self, shape=None): - sample = np.random.lognormal( - mean=self.parameters[0], sigma=self.parameters[1], size=shape + return np.random.lognormal( + mean=self._mean, sigma=self._std, size=shape ) - return self._clip_to_bounds(self._scale_sample(sample)) - def _pdf(self, x): - return lognorm.pdf( - x, scale=np.exp(self.parameters[0]), s=self.parameters[1] - ) + def pdf(self, x): + return lognorm.pdf(x, scale=np.exp(self._mean), s=self._std) class Uniform(Distribution): @@ -263,63 +116,81 @@ class Uniform(Distribution): def __init__( self, - lower_bound: float, - upper_bound: float, - bounds: tuple = None, - transformation: str = C.LIN, - _type=C.UNIFORM, - _parameter_scale=False, + low: float, + high: float, ): - super().__init__( - _type, - transformation=transformation, - parameters=(lower_bound, upper_bound), - bounds=bounds, - parameter_scale=_parameter_scale, - ) + super().__init__() + self._low = low + self._high = high + + def __repr__(self): + return f"Uniform(low={self._low}, high={self._high})" def sample(self, shape=None): - sample = np.random.uniform( - low=self.parameters[0], high=self.parameters[1], size=shape + return np.random.uniform(low=self._low, high=self._high, size=shape) + + def pdf(self, x): + return uniform.pdf(x, loc=self._low, scale=self._high - self._low) + + +class LogUniform(Distribution): + """A log-uniform distribution. + + :param low: The lower bound of the underlying normal distribution. + :param high: The upper bound of the underlying normal distribution. + """ + + def __init__( + self, + low: float, + high: float, + base: float = np.exp(1), + ): + super().__init__() + self._low = low + self._high = high + self._base = base + # re-scaled distribution parameters as required by + # scipy.stats.loguniform + self._low_internal = np.exp(np.log(base) * low) + self._high_internal = np.exp(np.log(base) * high) + + def __repr__(self): + return ( + f"LogUniform(low={self._low}, high={self._high}, " + f"base={self._base})" ) - return self._clip_to_bounds(self._scale_sample(sample)) - def _pdf(self, x): - return uniform.pdf( - x, - loc=self.parameters[0], - scale=self.parameters[1] - self.parameters[0], + def sample(self, shape=None): + return loguniform.rvs( + self._low_internal, self._high_internal, size=shape ) + def pdf(self, x): + return loguniform.pdf(x, self._low_internal, self._high_internal) + class Laplace(Distribution): """A Laplace distribution.""" def __init__( self, - mean: float, + loc: float, scale: float, - bounds: tuple = None, - transformation: str = C.LIN, - _type=C.LAPLACE, - _parameter_scale=False, + truncation: tuple[float, float] | None = None, ): - super().__init__( - _type, - transformation=transformation, - parameters=(mean, scale), - bounds=bounds, - parameter_scale=_parameter_scale, - ) + super().__init__() + self._loc = loc + self._scale = scale + self._truncation = truncation + if truncation is not None: + raise NotImplementedError("Truncation is not yet implemented.") def sample(self, shape=None): - sample = np.random.laplace( - loc=self.parameters[0], scale=self.parameters[1], size=shape - ) - return self._clip_to_bounds(self._scale_sample(sample)) + return np.random.laplace(loc=self._loc, scale=self._scale, size=shape) - def _pdf(self, x): - return laplace.pdf(x, loc=self.parameters[0], scale=self.parameters[1]) + def pdf(self, x): + return laplace.pdf(x, loc=self._loc, scale=self._scale) class LogLaplace(Distribution): @@ -327,110 +198,45 @@ class LogLaplace(Distribution): def __init__( self, - mean: float, + loc: float, scale: float, - bounds: tuple = None, - transformation: str = C.LIN, + truncation: tuple[float, float] | None = None, + base: float = np.exp(1), ): - super().__init__( - C.LOG_LAPLACE, - transformation=transformation, - parameters=(mean, scale), - bounds=bounds, - parameter_scale=False, + super().__init__() + self._loc = loc + self._scale = scale + self._truncation = truncation + self._base = base + if truncation is not None: + raise NotImplementedError("Truncation is not yet implemented.") + if base != np.exp(1): + raise NotImplementedError("Only base e is supported.") + + def __repr__(self): + return ( + f"LogLaplace(loc={self._loc}, scale={self._scale}, " + f"base={self._base}, truncation={self._truncation})" ) @property - def mean(self): + def loc(self): """The mean of the underlying Laplace distribution.""" - return self.parameters[0] + return self._loc @property def scale(self): """The scale of the underlying Laplace distribution.""" - return self.parameters[1] + return self._scale def sample(self, shape=None): - sample = np.exp( - np.random.laplace(loc=self.mean, scale=self.scale, size=shape) + return np.exp( + np.random.laplace(loc=self._loc, scale=self._scale, size=shape) ) - return self._clip_to_bounds(self._scale_sample(sample)) - def _pdf(self, x): + def pdf(self, x): return ( 1 / (2 * self.scale * x) - * np.exp(-np.abs(np.log(x) - self.mean) / self.scale) + * np.exp(-np.abs(np.log(x) - self._loc) / self._scale) ) - - -class ParameterScaleNormal(Normal): - """A normal distribution with parameters on the parameter scale.""" - - def __init__( - self, - mean: float, - std: float, - bounds: tuple = None, - transformation: str = C.LIN, - ): - super().__init__( - transformation=transformation, - bounds=bounds, - mean=mean, - std=std, - _type=C.PARAMETER_SCALE_NORMAL, - _parameter_scale=True, - ) - - -class ParameterScaleUniform(Uniform): - """A uniform distribution with parameters on the parameter scale.""" - - def __init__( - self, - lower_bound: float, - upper_bound: float, - bounds: tuple = None, - transformation: str = C.LIN, - ): - super().__init__( - transformation=transformation, - lower_bound=lower_bound, - upper_bound=upper_bound, - bounds=bounds, - _type=C.PARAMETER_SCALE_UNIFORM, - _parameter_scale=True, - ) - - -class ParameterScaleLaplace(Laplace): - """A Laplace distribution with parameters on the parameter scale.""" - - def __init__( - self, - mean: float, - scale: float, - bounds: tuple = None, - transformation: str = C.LIN, - ): - super().__init__( - _type=C.PARAMETER_SCALE_LAPLACE, - transformation=transformation, - mean=mean, - scale=scale, - bounds=bounds, - _parameter_scale=True, - ) - - -Distribution._type_to_cls = { - C.NORMAL: Normal, - C.LOG_NORMAL: LogNormal, - C.UNIFORM: Uniform, - C.LAPLACE: Laplace, - C.LOG_LAPLACE: LogLaplace, - C.PARAMETER_SCALE_NORMAL: ParameterScaleNormal, - C.PARAMETER_SCALE_UNIFORM: ParameterScaleUniform, - C.PARAMETER_SCALE_LAPLACE: ParameterScaleLaplace, -} diff --git a/petab/v1/priors.py b/petab/v1/priors.py index 52fec20d..554a22a7 100644 --- a/petab/v1/priors.py +++ b/petab/v1/priors.py @@ -1,5 +1,8 @@ """Functions related to prior handling.""" +from __future__ import annotations + import copy +from typing import Literal import numpy as np import pandas as pd @@ -29,12 +32,225 @@ PARAMETER_SEPARATOR, SIMULATION_CONDITION_ID, TIME, + C, Problem, ) +from .distributions import * +from .parameters import scale, unscale __all__ = ["priors_to_measurements"] +class Prior: + """A PEtab parameter prior. + + Different from the general :class:`Distribution`, this class is used to + represent the prior distribution of a PEtab parameter using the + PEtab-specific options like `parameterScale`, `*PriorType`, + `*PriorParameters`, and `lowerBound` / `upperBounds`. + + :param type_: The type of the distribution. + :param transformation: The transformation to be applied to the sample. + Ignored if `parameter_scale` is `True`. + :param parameters: The parameters of the distribution (unaffected by + `parameter_scale` and `transformation`, but in the case of + `parameterScale*` distribution types, the parameters are assumed to be + on the `parameter_scale` scale). + :param bounds: The untransformed bounds of the sample (lower, upper). + :param transformation: The transformation of the distribution. + """ + + def __init__( + self, + type_: str, + parameters: tuple, + bounds: tuple = None, + transformation: str = C.LIN, + ): + if transformation not in C.PARAMETER_SCALES: + raise ValueError( + f"Unknown parameter transformation: {transformation}" + ) + + if len(parameters) != 2: + raise ValueError( + f"Expected two parameters, got {len(parameters)}: {parameters}" + ) + + if bounds is not None and len(bounds) != 2: + raise ValueError( + "Expected (lowerBound, upperBound), got " + f"{len(bounds)}: {bounds}" + ) + + self._type = type_ + self._parameters = parameters + self._bounds = bounds + self._transformation = transformation + + # create the underlying distribution + match type_, transformation: + case (C.UNIFORM, _) | (C.PARAMETER_SCALE_UNIFORM, C.LIN): + self.distribution = Uniform(*parameters) + case (C.NORMAL, _) | (C.PARAMETER_SCALE_NORMAL, C.LIN): + self.distribution = Normal(*parameters) + case (C.LAPLACE, _) | (C.PARAMETER_SCALE_LAPLACE, C.LIN): + self.distribution = Laplace(*parameters) + case (C.PARAMETER_SCALE_UNIFORM, C.LOG): + self.distribution = LogUniform(*parameters) + case (C.LOG_NORMAL, _) | (C.PARAMETER_SCALE_NORMAL, C.LOG): + self.distribution = LogNormal(*parameters) + case (C.LOG_LAPLACE, _) | (C.PARAMETER_SCALE_LAPLACE, C.LOG): + self.distribution = LogLaplace(*parameters) + case (C.PARAMETER_SCALE_UNIFORM, C.LOG10): + self.distribution = LogUniform(*parameters, base=10) + case (C.PARAMETER_SCALE_NORMAL, C.LOG10): + self.distribution = LogNormal( + np.log(10) * parameters[0], np.log(10) * parameters[1] + ) + case (C.PARAMETER_SCALE_LAPLACE, C.LOG10): + self.distribution = LogLaplace( + np.log(10) * parameters[0], np.log(10) * parameters[1] + ) + case _: + raise ValueError( + "Unsupported distribution type / transformation: " + f"{type_} / {transformation}" + ) + + def __repr__(self): + return ( + f"{self.__class__.__name__}(" + f"{self.type!r}, {self.parameters!r}," + f" bounds={self.bounds!r}, transformation={self.transformation!r}," + ")" + ) + + @property + def type(self): + return self._type + + @property + def parameters(self): + return self._parameters + + @property + def bounds(self): + return self._bounds + + @property + def transformation(self): + return self._transformation + + def sample(self, shape=None) -> np.ndarray: + """Sample from the distribution. + + :param shape: The shape of the sample. + :return: A sample from the distribution. + """ + raw_sample = self.distribution.sample(shape) + return self._clip_to_bounds(self._scale_sample(raw_sample)) + + def _scale_sample(self, sample): + """Scale the sample to the parameter space""" + # if self.on_parameter_scale: + # return sample + + return scale(sample, self.transformation) + + def _clip_to_bounds(self, x): + """Clip `x` values to bounds. + + :param x: The values to clip. Assumed to be on the parameter scale. + """ + # TODO: replace this by proper truncation + if self.bounds is None: + return x + + return np.maximum( + np.minimum(self.ub_scaled, x), + self.lb_scaled, + ) + + @property + def lb_scaled(self): + """The lower bound on the parameter scale.""" + return scale(self.bounds[0], self.transformation) + + @property + def ub_scaled(self): + """The upper bound on the parameter scale.""" + return scale(self.bounds[1], self.transformation) + + def pdf(self, x): + """Probability density function at x. + + :param x: The value at which to evaluate the PDF. + ``x`` is assumed to be on the parameter scale. + :return: The value of the PDF at ``x``. Note that the PDF does + currently not account for the clipping at the bounds. + """ + x = unscale(x, self.transformation) + + # scale the PDF to the parameter scale + if self.transformation == C.LIN: + coeff = 1 + elif self.transformation == C.LOG10: + coeff = x * np.log(10) + elif self.transformation == C.LOG: + coeff = x + else: + raise ValueError(f"Unknown transformation: {self.transformation}") + + return self.distribution.pdf(x) * coeff + + def neglogprior(self, x): + """Negative log-prior at x. + + :param x: The value at which to evaluate the negative log-prior. + ``x`` is assumed to be on the parameter scale. + :return: The negative log-prior at ``x``. + """ + return -np.log(self.pdf(x)) + + @staticmethod + def from_par_dict( + d, type_=Literal["initialization", "objective"] + ) -> Prior: + """Create a distribution from a row of the parameter table. + + :param d: A dictionary representing a row of the parameter table. + :param type_: The type of the distribution. + :return: A distribution object. + """ + dist_type = d.get(f"{type_}PriorType", C.PARAMETER_SCALE_UNIFORM) + if not isinstance(dist_type, str) and np.isnan(dist_type): + dist_type = C.PARAMETER_SCALE_UNIFORM + + pscale = d.get(C.PARAMETER_SCALE, C.LIN) + if ( + pd.isna(d[f"{type_}PriorParameters"]) + and dist_type == C.PARAMETER_SCALE_UNIFORM + ): + params = ( + scale(d[C.LOWER_BOUND], pscale), + scale(d[C.UPPER_BOUND], pscale), + ) + else: + params = tuple( + map( + float, + d[f"{type_}PriorParameters"].split(C.PARAMETER_SEPARATOR), + ) + ) + return Prior( + type_=dist_type, + parameters=params, + bounds=(d[C.LOWER_BOUND], d[C.UPPER_BOUND]), + transformation=pscale, + ) + + def priors_to_measurements(problem: Problem): """Convert priors to measurements. diff --git a/petab/v1/sampling.py b/petab/v1/sampling.py index c24b3c51..a046879f 100644 --- a/petab/v1/sampling.py +++ b/petab/v1/sampling.py @@ -6,7 +6,6 @@ import pandas as pd from .C import * # noqa: F403 -from .distributions import Distribution __all__ = ["sample_from_prior", "sample_parameter_startpoints"] @@ -24,11 +23,13 @@ def sample_from_prior( Returns: Array with sampled values """ + from .priors import Prior + # unpack info p_type, p_params, scaling, bounds = prior - prior_cls = Distribution._type_to_cls[p_type] - prior = prior_cls(*p_params, bounds=tuple(bounds), transformation=scaling) - + prior = Prior( + p_type, tuple(p_params), bounds=tuple(bounds), transformation=scaling + ) return prior.sample(shape=(n_starts,)) @@ -53,6 +54,8 @@ def sample_parameter_startpoints( Array of sampled starting points with dimensions `n_startpoints` x `n_optimization_parameters` """ + from .priors import Prior + if seed is not None: np.random.seed(seed) @@ -71,9 +74,7 @@ def sample_parameter_startpoints( # get types and parameters of priors from dataframe return np.array( [ - Distribution.from_par_dict(row, type_="initialization").sample( - n_starts - ) + Prior.from_par_dict(row, type_="initialization").sample(n_starts) for row in par_to_estimate.to_dict("records") ] ).T diff --git a/tests/v1/test_distributions.py b/tests/v1/test_distributions.py index 645b0339..c37ef2b0 100644 --- a/tests/v1/test_distributions.py +++ b/tests/v1/test_distributions.py @@ -17,11 +17,9 @@ Normal(1, 1), LogNormal(2, 1), Uniform(2, 4), + LogUniform(1, 2), Laplace(1, 2), LogLaplace(1, 0.5), - ParameterScaleNormal(1, 1), - ParameterScaleLaplace(1, 2), - ParameterScaleUniform(1, 2), ], [LIN, LOG, LOG10], ) diff --git a/tests/v1/test_priors.py b/tests/v1/test_priors.py index 73d76fb4..d98879e3 100644 --- a/tests/v1/test_priors.py +++ b/tests/v1/test_priors.py @@ -1,10 +1,13 @@ from copy import deepcopy +from itertools import product from pathlib import Path import benchmark_models_petab import numpy as np import pandas as pd import pytest +from scipy.integrate import cumulative_trapezoid +from scipy.stats import kstest import petab.v1 from petab.v1 import ( @@ -13,11 +16,11 @@ OBJECTIVE_PRIOR_TYPE, OBSERVABLE_ID, SIMULATION, + C, get_simulation_conditions, get_simulation_df, ) -from petab.v1.distributions import Distribution -from petab.v1.priors import priors_to_measurements +from petab.v1.priors import Prior, priors_to_measurements @pytest.mark.parametrize( @@ -152,7 +155,7 @@ def apply_parameter_values(row): & petab_problem_priors.parameter_df[OBJECTIVE_PRIOR_TYPE].notna() ] priors = [ - Distribution.from_par_dict( + Prior.from_par_dict( petab_problem_priors.parameter_df.loc[par_id], type_="objective" ) for par_id in parameter_ids @@ -166,3 +169,46 @@ def apply_parameter_values(row): ), (llh_priors + prior_contrib, llh_measurements) # check that the tolerance is not too high assert np.abs(prior_contrib) > 1e-8 * np.abs(llh_priors) + + +cases = list( + product( + [ + (C.NORMAL, (10, 1)), + (C.LOG_NORMAL, (2, 1)), + (C.UNIFORM, (1, 2)), + (C.LAPLACE, (20, 2)), + (C.LOG_LAPLACE, (1, 0.5)), + (C.PARAMETER_SCALE_NORMAL, (1, 1)), + (C.PARAMETER_SCALE_LAPLACE, (1, 2)), + (C.PARAMETER_SCALE_UNIFORM, (1, 2)), + ], + C.PARAMETER_SCALES, + ) +) +ids = [f"{prior_args[0]}_{transform}" for prior_args, transform in cases] + + +@pytest.mark.parametrize("prior_args, transform", cases, ids=ids) +def test_sample_matches_pdf(prior_args, transform): + """Test that the sample matches the PDF.""" + np.random.seed(1) + N_SAMPLES = 10_000 + prior = Prior(*prior_args, transformation=transform) + sample = prior.sample(N_SAMPLES) + + # pdf -> cdf + def cdf(x): + return cumulative_trapezoid(prior.pdf(x), x) + + # Kolmogorov-Smirnov test to check if the sample is drawn from the CDF + _, p = kstest(sample, cdf) + + # if p < 0.05: + # import matplotlib.pyplot as plt + # plt.hist(sample, bins=100, density=True) + # x = np.linspace(min(sample), max(sample), 100) + # plt.plot(x, distribution.pdf(x)) + # plt.show() + + assert p > 0.05, (p, prior)