Source code for easyvvuq.sampling.fd

#from hashlib import shake_128
import logging
import chaospy as cp
import numpy as np
import random
from .base import BaseSamplingElement, Vary
from .transformations import Transformations


# DEBUG USI
from os import stat, path
from time import ctime
import json

__author__ = "Jalal Lakhlili"
__copyright__ = """

    Copyright 2018 Robin A. Richardson, David W. Wright

    This file is part of EasyVVUQ

    EasyVVUQ is free software: you can redistribute it and/or modify
    it under the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    EasyVVUQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
__license__ = "LGPL"


[docs] class FDSampler(BaseSamplingElement, sampler_name="FD_sampler"): def __init__(self, vary=None, distribution=None, perturbation=0.05, nominal_value=None, count=0, relative_analysis=False): """ Create the sampler for the Polynomial Chaos Expansion using pseudo-spectral projection or regression (Point Collocation). Parameters ---------- vary: dict or None keys = parameters to be sampled, values = distributions. distribution: cp.Distribution or matrix-like Joint distribution specifying dependency between the parameters or correlation matrix of the parameters. Depending on the type of the argument either Rosenblatt or Cholesky transformation will be used to handle the dependent parameters. perturbation: float Perturbation of the parameters used in the finite difference scheme nominal_value : dict, optional FD sampler perturbs the base value (NV) of the parameters by the value specified. It should be a dict with the keys which are present in vary. In case the nominal_value is None, the mean of the distribution is used (assuming cp.Normal). count : int, optional Specified counter for Fast forward, default is 0. relative_analysis : bool, optional Default False, the nodes are perturbed by the value specified in perturbation such that n = NV + perturbation. If True, the nodes are perturbed by the relative value specified in perturbation such that n = NB * (1 + perturbation). """ # Create and initialize the logger self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.DEBUG) # Logger is already configured, remove all handlers if self.logger.hasHandlers(): self.logger.handlers = [] formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s:%(message)s') file_handler = logging.FileHandler('FD.log') file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(stream_handler) if vary is None: msg = ("'vary' cannot be None. RandomSampler must be passed a " "dict of the names of the parameters you want to vary, " "and their corresponding distributions.") self.logger.error(msg) raise Exception(msg) if not isinstance(vary, dict): msg = ("'vary' must be a dictionary of the names of the " "parameters you want to vary, and their corresponding " "distributions.") self.logger.error(msg) raise Exception(msg) if len(vary) == 0: msg = "'vary' cannot be empty." self.logger.error(msg) raise Exception(msg) self.vary = Vary(vary) # List of the probability distributions of uncertain parameters params_distribution = list(vary.values()) params_num = len(params_distribution) # Remember whether to add the extra run self.logger.info(f"Performing relative analysis: {relative_analysis}") self.relative_analysis = relative_analysis self._perturbation = perturbation # Perturbation of the parameters if nominal_value is None: self.logger.info(f"Performing perturbation of the nodes, base value = mean, with delta = {perturbation}") # Assumes that v is cp.Normal() assert(all([type(v) == type(cp.Normal()) for v in vary.values()])) nominal_value = {k: v.get_mom_parameters()['shift'][0] for k,v in vary.items()} #Set nominal_value to the mean_of_the_parameters else: if (len(nominal_value) != params_num): msg = ("'nominal_value' must be a 1D array of the same size as the number of parameters.") self.logger.error(msg) raise ValueError(msg) self.logger.info(f"Performing perturbation of the nodes, base value = {nominal_value}, with delta = {perturbation}") # Generate the perturbed values of the parameters for the FD #FD = 0.5*(y_pos/y_base-1)/(delta) + 0.5*(y_neg/y_base - 1)/(-delta) self.generate_nodes(nominal_value, vary, distribution) # Fast forward to specified count, if possible self.count = 0 if self.count >= self._n_samples: msg = (f"Attempt to start sampler fastforwarded to count {self.count}, " f"but sampler only has {self.n_samples} samples, therefore" f"this sampler will not provide any more samples.") self.logger.warning(msg) else: for i in range(count): self.__next__()
[docs] def permute_problem(self, perm, mean, sigma, cov): # Apply selected permutation to the problem nParams = len(mean) # cyclic permutation i = perm_step, N = nParams # perm = [i i+1 ... N-1 0 1 ... i-1] assert(len(perm) == nParams) # create permutation matrix P = np.eye(nParams)[perm] # Model properties that need to be permuted mean_ = P@mean sigma_ = P@sigma cov_ = np.matmul(P, np.matmul(cov, P.transpose())) return mean_, sigma_, cov_
[docs] def generate_nodes(self, nominal_value, vary, distribution): params_num = len(nominal_value) self._n_samples = 2*params_num + 1 # Multivariate distribution, the behaviour changes based on the # 'distribution' argument, which can be: # None - use default joint # cp.Distribution - use Rosenblatt if the distribution is dependent # matrix-like - use Cholesky self._is_dependent = False self._transformation = None self.distribution_dep = None if 'distributions' in str(type(distribution)): if not distribution.stochastic_dependent: raise ValueError("User provided joint distribution needs to contain dependency between the parameters") if not isinstance(distribution, cp.MvNormal): raise ValueError("User provided joint distribution needs to be a cp.MvNormal") if not len(distribution._parameters['mean']) == params_num: raise ValueError("User provided joint distribution does not contain all the parameters listed in vary") self.logger.info("Using user provided joint distribution with Rosenblatt transformation") self._is_dependent = True self._transformation = "Rosenblatt" self.distribution_dep = distribution mu = distribution._parameters['mean'] cov = distribution._covariance elif 'list' in str(type(distribution)) or 'ndarray' in str(type(distribution)): if not len(distribution) == params_num: raise ValueError("User provided correlation matrix does not contain all the parameters listed in vary") for i in range(params_num): if not distribution[i][i] == 1.0: raise ValueError("User provided correlation matrix is not a correlation matrix (diagonal elements are not 1.0)") self.logger.info("Using user provided correlation matrix for Cholesky transformation") self._is_dependent = True self._transformation = "Cholesky" self.distribution_dep = np.array(distribution) elif distribution is None: pass else: raise ValueError("Unsupported type of the distribution argument. It should be either cp.Distribution or a matrix-like array") # Set up independent perturbations #G: [0, d, -d, 0, 0, 0, 0] #C: [0, 0, 0, d, -d, 0, 0] #E: [0, 0, 0, 0, 0, d, -d] self._perturbations = np.zeros((params_num, self._n_samples)) offset = 1 #the first sample is the nominal value at x0 for p in range(params_num): self._perturbations[p][offset] = + self._perturbation self._perturbations[p][offset+1] = - self._perturbation offset = offset + 2 # Convert perturbation to absolute values self._nodes = np.array([ nominal_value[p] * np.ones(self._n_samples) for p in vary.keys() ]) offset = 1 #the first sample is the nominal value at x0 for p in range(params_num): if self.relative_analysis: self._nodes[p][offset] = (1 + self._perturbations[p][offset]) * self._nodes[p][offset] self._nodes[p][offset+1] = (1 + self._perturbations[p][offset+1]) * self._nodes[p][offset+1] else: self._nodes[p][offset] = self._nodes[p][offset] + self._perturbations[p][offset] self._nodes[p][offset+1] = self._nodes[p][offset+1] + self._perturbations[p][offset+1] offset = offset + 2 self.logger.info(f"Generated {offset}/{self._n_samples} samples for the FD scheme") # Create perturbed values with correlations # dependent Nodes, where di is the induced movement of the parameter i caused by movement in d #G: [0, -d, d, 0, 0, 0, 0] #C: [0, -di, di, -d, d, 0, 0] #E: [0, -di, di, -di, di, -d, d] if self._is_dependent: # Assume permutation [0,1,2] perm = [(i) % params_num for i in range(params_num)] vary_ = {x: vary[x] for i, x in enumerate([list(vary.keys())[i] for i in perm])} if self._transformation == "Rosenblatt": self.logger.info("Performing Rosenblatt transformation") # Create the dependent distribution mean_, _, cov_ = self.permute_problem(perm, mu, np.zeros(params_num), cov) distribution_dep_ = cp.MvNormal(mean_, cov_) # Create the independent distribution params_distribution = [vary_dist for vary_dist in vary_.values()] distribution_ = cp.J(*params_distribution) # This assumes that the order of the parameters in distribution and distribution_dep is the same # and the distribution type is cp.Normal for id_v, v in enumerate(vary_): assert(type(vary_[v]) == type(cp.Normal())) assert(vary_[v].get_mom_parameters()['shift'][0] == distribution_dep_._parameters['mean'][id_v]) assert(vary_[v].get_mom_parameters()['shift'][0] == distribution_[id_v].get_mom_parameters()['shift'][0]) self.logger.debug(f"The independent distribution consists of: {distribution_}") self.logger.debug(f"Using parameter permutation: {list(vary_.keys())}") self._nodes_dep = Transformations.rosenblatt(self._nodes, distribution_, distribution_dep_) #self._perturbations_dep = Transformations.rosenblatt(self._perturbations, distribution_, distribution_dep_) elif self._transformation == "Cholesky": self.logger.info("Performing Cholesky transformation") _, _, distribution_ = self.permute_problem(perm, np.zeros(params_num), np.zeros(params_num), distribution) self._nodes_dep = Transformations.cholesky(self._nodes, vary_, distribution_) #self._perturbations_dep = Transformations.cholesky(self._perturbations, vary_, distribution_) else: self.logger.critical("Error: How did this happen? We are transforming the nodes but not with Rosenblatt nor Cholesky") exit() return
[docs] def is_finite(self): return True
@property def n_samples(self): """ Number of samples (Ns) of PCE method. - When using pseudo-spectral projection method with tensored quadrature: Ns = (p + 1)**d - When using pseudo-spectral projection method with sparce grid quadratue: Ns = bigO((p + 1)*log(p + 1)**(d-1)) - When using regression method: Ns = 2*(p + d)!/p!*d! Where: p is the polynomial degree and d is the number of uncertain parameters. Ref: Eck et al. 'A guide to uncertainty quantification and sensitivity analysis for cardiovascular applications' [2016]. """ return self._n_samples @property def analysis_class(self): """Return a corresponding analysis class. """ from easyvvuq.analysis import FDAnalysis return FDAnalysis def __next__(self): if self.count < self._n_samples: #base Train samples used to evaluate the PCE run_dict = {} for i, param_name in enumerate(self.vary.vary_dict): # These are nodes that need to be returned as samples o be used for the model execution, # for the SA in EasyVVUQ we will use only the raw independent nodes if self._is_dependent: # Return transformed nodes reflecting the dependencies run_dict[param_name] = self._nodes_dep[i][self.count] else: run_dict[param_name] = self._nodes[i][self.count] self.count += 1 return run_dict else: raise StopIteration