Source code for easyvvuq.sampling.pce

import logging
import chaospy as cp
import numpy as np
import random
from .base import BaseSamplingElement, Vary
from .transformations import Transformations

__author__ = "Jalal Lakhlili"
__copyright__ = """

    Copyright 2018 Robin A. Richardson, David W. Wright

    This file is part of EasyVVUQ

    EasyVVUQ is free software: you can redistribute it and/or modify
    it under the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    EasyVVUQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
__license__ = "LGPL"


[docs] class PCESampler(BaseSamplingElement, sampler_name="PCE_sampler"): def __init__(self, vary=None, distribution=None, count=0, polynomial_order=4, regression=False, rule="G", sparse=False, growth=False, relative_analysis=False, nominal_value=None): """ Create the sampler for the Polynomial Chaos Expansion using pseudo-spectral projection or regression (Point Collocation). Parameters ---------- vary: dict or None keys = parameters to be sampled, values = distributions. distribution: cp.Distribution or matrix-like Joint distribution specifying dependency between the parameters or correlation matrix of the parameters. Depending on the type of the argument either Rosenblatt or Cholesky transformation will be used to handle the dependent parameters. count : int, optional Specified counter for Fast forward, default is 0. polynomial_order : int, optional The polynomial order, default is 4. regression : bool, optional If True, regression variante (point collecation) will be used, otherwise projection variante (pseud-spectral) will be used. Default value is False. rule : char, optional The quadrature method, in case of projection (default is Gaussian "G"). The sequence sampler in case of regression (default is Hammersley "M") sparse : bool, optional If True, use Smolyak sparse grid instead of normal tensor product grid. Default value is False. growth (bool, None), optional If True, quadrature point became nested. relative_analysis (bool, None), optional If True, we add one additional sample with all parameters having zero (nominal) value. This is used in the relative analysis, where the model output is represented relative to the nominal output, and similarly, the parameters represent the delta of the parameter nominal value (i.e. zero represents parameter's nominal value, nominal + delta*nominal) nominal_value : dict, optional Evaluate derivative of the model at the nominal value of the parameters. It should be a dict with the keys which are present in vary. In case the base_value is None, the mean of the distribution is used (assuming cp.Normal). """ if vary is None: msg = ("'vary' cannot be None. RandomSampler must be passed a " "dict of the names of the parameters you want to vary, " "and their corresponding distributions.") logging.error(msg) raise Exception(msg) if not isinstance(vary, dict): msg = ("'vary' must be a dictionary of the names of the " "parameters you want to vary, and their corresponding " "distributions.") logging.error(msg) raise Exception(msg) if len(vary) == 0: msg = "'vary' cannot be empty." logging.error(msg) raise Exception(msg) # Remember whether to add the extra run using the base value of the parameters (0 corresponding to the mean) logging.info(f"Performing relative analysis: {relative_analysis}") self.relative_analysis = relative_analysis self.vary = Vary(vary) self.polynomial_order = polynomial_order # List of the probability distributions of uncertain parameters self.params_distribution = list(vary.values()) params_num = len(self.params_distribution) # Nominal value of the parameters if nominal_value is None: # Assumes that v is cp.Normal() if (all([type(v) == type(cp.Normal()) for v in vary.values()])): nominal_value = {k: v.get_mom_parameters()['shift'][0] for k,v in vary.items()} #Set nominal_value to the mean_of_the_parameters logging.info(f"Using parameter mean for the relative analysis {nominal_value}") else: if (len(nominal_value) != params_num): msg = ("'nominal_value' must be a 1D array of the same size as the number of parameters.") logging.error(msg) raise ValueError(msg) logging.info(f"Using user-provided nominal value for relative analysis {nominal_value}") self.nominal_value = nominal_value # Multivariate distribution, the behaviour changes based on the # 'distribution' argument, which can be: # None - use default joint # cp.Distribution - use Rosenblatt if the distribution is dependent # matrix-lie - use Cholesky self._is_dependent = False self._transformation = None self.distribution_dep = None if distribution is None: logging.info("Using default joint distribution") self.distribution = cp.J(*self.params_distribution) elif 'distributions' in str(type(distribution)): if distribution.stochastic_dependent: if not isinstance(distribution, cp.MvNormal): raise ValueError("User provided joint distribution needs to be a cp.MvNormal") if not len(distribution._parameters['mean']) == params_num: raise ValueError("User provided joint distribution does not contain all the parameters listed in vary") logging.info("Using user provided joint distribution with Rosenblatt transformation") self._is_dependent = True self._transformation = "Rosenblatt" self.distribution_dep = distribution else: logging.info("Using user provided joint distribution without any transformation") self.distribution = distribution assert(self._is_dependent == False) elif 'list' in str(type(distribution)) or 'ndarray' in str(type(distribution)): if not len(distribution) == params_num: raise ValueError("User provided correlation matrix does not contain all the parameters listed in vary") for i in range(params_num): if not distribution[i][i] == 1.0: raise ValueError("User provided correlation matrix is not a correlation matrix (diagonal elements are not 1.0)") logging.info("Using user provided correlation matrix for Cholesky transformation") self._is_dependent = True self._transformation = "Cholesky" self.distribution_dep = np.array(distribution) else: logging.error("Unsupported type of the distribution argument. It should be either cp.Distribution or a matrix-like array") exit() # Build independent joint multivariate distribution considering each uncertain paramter if not self.distribution_dep is None: params_distribution = [vary_dist for vary_dist in vary.values()] self.distribution = cp.J(*params_distribution) # This assumes that the order of the parameters in distribution and distribution_dep is the same # and the distribution type is cp.Normal for id_v, v in enumerate(vary): assert(type(vary[v]) == type(cp.Normal())) if self._transformation == "Rosenblatt": assert(vary[v].get_mom_parameters()['shift'][0] == self.distribution_dep._parameters['mean'][id_v]) assert(vary[v].get_mom_parameters()['shift'][0] == self.distribution[id_v].get_mom_parameters()['shift'][0]) logging.debug(f"The independent distribution consists of: {self.distribution}") logging.debug(f"Using parameter permutation: {list(vary.keys())}") # The orthogonal polynomials corresponding to the joint distribution self.P = cp.expansion.stieltjes(polynomial_order, self.distribution, normed=True) # The quadrature information self.quad_sparse = sparse self.rule = rule # Clenshaw-Curtis should be nested if sparse (#139 chaospy issue) self.quad_growth = growth cc = ['c', 'C', 'clenshaw_curtis', 'Clenshaw_Curtis'] if sparse and rule in cc: self.quad_growth = True # To determinate the PCE vraint to use self.regression = regression # indices of cp.DiscreteUniform parameters idx_discrete = np.where([isinstance(p, cp.DiscreteUniform) for p in self.params_distribution])[0] # Regression variante (Point collocation method) if regression: logging.info(f"Using point collocation method to create PCE") # Change the default rule if rule == "G": self.rule = "M" # Generates samples self._n_samples = 2 * len(self.P) logging.info(f"Generating {self._n_samples} samples using {self.rule} rule") self._nodes = cp.generate_samples(order=self._n_samples, domain=self.distribution, rule=self.rule) # Transform relative nodes to absolute nodes if self.relative_analysis: for pi,p in enumerate(vary.keys()): self._nodes[pi] = (1.0 + self._nodes[pi]) * nominal_value[p] self._weights = None # Nodes transformation if self._is_dependent: if self._transformation == "Rosenblatt": logging.info("Performing Rosenblatt transformation") self._nodes_dep = Transformations.rosenblatt(self._nodes, self.distribution, self.distribution_dep, regression) elif self._transformation == "Cholesky": logging.info("Performing Cholesky transformation") self._nodes_dep = Transformations.cholesky(self._nodes, self.vary, self.distribution_dep, regression) else: logging.critical("Error: How did this happen? We are transforming the nodes but not with Rosenblatt nor Cholesky") exit() # Projection variante (Pseudo-spectral method) else: if type(self.rule) is str and idx_discrete.size > 0: tmp = [] for i in range(params_num): if i in idx_discrete: tmp.append("discrete") else: tmp.append(rule) self.rule = tmp logging.info(f"Using pseudo-spectral method to create PCE") # Nodes and weights for the integration self._nodes, self._weights = cp.generate_quadrature(order=polynomial_order, dist=self.distribution, rule=self.rule, sparse=sparse, growth=self.quad_growth) # Number of samples self._n_samples = len(self._nodes[0]) logging.info(f"Generated {self._n_samples} nodes/weights pairs using {self.rule} rule") # Nodes transformation if self._is_dependent: # Scale the independent nodes raise NotImplementedError(f'Transformation of the independent nodes not supported with {regression = }') if self.relative_analysis: raise NotImplementedError(f'Transformation of the relative nodes not supported with {regression = }') # Fast forward to specified count, if possible self.count = 0 if self.count >= self._n_samples: msg = (f"Attempt to start sampler fastforwarded to count {self.count}, " f"but sampler only has {self.n_samples} samples, therefore" f"this sampler will not provide any more samples.") logging.warning(msg) else: for i in range(count): self.__next__()
[docs] def is_finite(self): return True
@property def n_samples(self): """ Number of samples (Ns) of PCE method. - When using pseudo-spectral projection method with tensored quadrature: Ns = (p + 1)**d - When using pseudo-spectral projection method with sparce grid quadratue: Ns = bigO((p + 1)*log(p + 1)**(d-1)) - When using regression method: Ns = 2*(p + d)!/p!*d! Where: p is the polynomial degree and d is the number of uncertain parameters. Ref: Eck et al. 'A guide to uncertainty quantification and sensitivity analysis for cardiovascular applications' [2016]. """ return self._n_samples @property def analysis_class(self): """Return a corresponding analysis class. """ from easyvvuq.analysis import PCEAnalysis return PCEAnalysis def __next__(self): if self.count < self._n_samples: #base Train samples used to evaluate the PCE run_dict = {} for i, param_name in enumerate(self.vary.vary_dict): # These are nodes that need to be returned as samples o be used for the model execution, # for the SA in EasyVVUQ we will use only the raw independent nodes if self._is_dependent: # Return transformed nodes reflecting the dependencies run_dict[param_name] = self._nodes_dep[i][self.count] else: current_param = self._nodes[i][self.count] # all parameters self.xi_d will store floats. If current param is # DiscreteUniform, convert e.g. 2.0 to 2 before running # the simulation. if isinstance(self.params_distribution[i], cp.DiscreteUniform): current_param = int(current_param) run_dict[param_name] = current_param self.count += 1 return run_dict elif self.relative_analysis and self.count == self._n_samples: #extra sample for the nominal case run_dict = self.nominal_value self.count += 1 return run_dict else: raise StopIteration