from .base import BaseSamplingElement, Vary
import chaospy as cp
import numpy as np
import pickle
from itertools import product, chain
import logging
__author__ = "Wouter Edeling"
__copyright__ = """
Copyright 2018 Robin A. Richardson, David W. Wright
This file is part of EasyVVUQ
EasyVVUQ is free software: you can redistribute it and/or modify
it under the terms of the Lesser GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EasyVVUQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
__license__ = "LGPL"
[docs]class SCSampler(BaseSamplingElement, sampler_name="sc_sampler"):
"""
Stochastic Collocation sampler
"""
def __init__(self,
vary=None,
polynomial_order=4,
quadrature_rule="G",
count=0,
growth=False,
sparse=False,
midpoint_level1=True,
dimension_adaptive=False):
"""
Create the sampler for the Stochastic Collocation method.
Parameters
----------
vary: dict or None
keys = parameters to be sampled, values = distributions.
polynomial_order : int, optional
The polynomial order, default is 4.
quadrature_rule : char, optional
The quadrature method, default is Gaussian "G".
growth: bool, optional
Sets the growth rule to exponential for Clenshaw Curtis quadrature,
which makes it nested, and therefore more efficient for sparse grids.
Default is False.
sparse : bool, optional
If True use sparse grid instead of normal tensor product grid,
default is False.
"""
self.vary = Vary(vary)
self.quadrature_rule = quadrature_rule
# List of the probability distributions of uncertain parameters
params_distribution = list(self.vary.get_values())
# N = number of uncertain parameters
self.N = len(params_distribution)
logging.debug("param dist {}".format(params_distribution))
# Multivariate distribution
self.joint_dist = cp.J(*params_distribution)
# The quadrature information: order, rule and sparsity
if isinstance(polynomial_order, int):
logging.debug('Received integer polynomial order, assuming isotropic grid')
self.polynomial_order = [polynomial_order for i in range(self.N)]
else:
self.polynomial_order = polynomial_order
self.quad_rule = quadrature_rule
self.sparse = sparse
# determines how many points the 1st level of a sparse grid will have.
# If midpoint_level1 = True, order 0 quadrature will be generated
self.midpoint_level1 = midpoint_level1
# determines wether to use an insotropic sparse grid, or to adapt
# the levels in the sparse grid based on a hierachical error measure
self.dimension_adaptive = dimension_adaptive
self.nadaptations = 0
self.quad_sparse = sparse
self.growth = growth
self.params_distribution = params_distribution
self.check_max_quad_level()
# determine if a nested sparse grid is used
if self.sparse is True and self.growth is True and \
(self.quad_rule == "C" or self.quad_rule == "clenshaw_curtis"):
self.nested = True
elif self.sparse is True and self.growth is False and self.quad_rule == "gauss_patterson":
self.nested = True
elif self.sparse is True and self.growth is True and self.quad_rule == "newton_cotes":
self.nested = True
else:
self.nested = False
# L = level of (sparse) grid
self.L = np.max(self.polynomial_order)
# compute the 1D collocation points (and quad weights)
self.compute_1D_points_weights(self.L, self.N)
# compute N-dimensional collocation points
if not self.sparse:
# generate collocation grid locally
l_norm = np.array([self.polynomial_order])
self.xi_d = self.generate_grid(l_norm)
# sparse grid = a linear combination of tensor products of 1D rules
# of different order. Use chaospy to compute these 1D quadrature rules
else:
self.l_norm = self.compute_sparse_multi_idx(self.L, self.N)
# create sparse grid of dimension N and level q using the 1d
# rules in self.xi_1d
self.xi_d = self.generate_grid(self.l_norm)
self._n_samples = self.xi_d.shape[0]
self.count = 0
@property
def analysis_class(self):
"""Return a corresponding analysis class.
"""
from easyvvuq.analysis import SCAnalysis
return SCAnalysis
[docs] def compute_1D_points_weights(self, L, N):
"""
Computes 1D collocation points and quad weights,
and stores this in self.xi_1d, self.wi_1d.
Parameters
----------
L : (int) the max level of the (sparse) grid
N : (int) the number of uncertain parameters
Returns
-------
None.
"""
# for every dimension (parameter), create a hierachy of 1D
# quadrature rules of increasing order
self.xi_1d = [{} for n in range(N)]
self.wi_1d = [{} for n in range(N)]
if self.sparse:
# if level one of the sparse grid is a midpoint rule, generate
# the quadrature with order 0 (1 quad point). Else set order at
# level 1 to 1
if self.midpoint_level1:
j = 0
else:
j = 1
for n in range(N):
# check if input is discrete uniform, in which case the
# rule and growth flag must be modified
if isinstance(self.params_distribution[n], cp.DiscreteUniform):
rule = "discrete"
else:
rule = self.quad_rule
for i in range(L):
xi_i, wi_i = cp.generate_quadrature(i + j,
self.params_distribution[n],
rule=rule,
growth=self.growth)
self.xi_1d[n][i + 1] = xi_i[0]
self.wi_1d[n][i + 1] = wi_i
else:
for n in range(N):
# check if input is discrete uniform, in which case the
# rule flag must be modified
if isinstance(self.params_distribution[n], cp.DiscreteUniform):
rule = "discrete"
else:
rule = self.quad_rule
xi_i, wi_i = cp.generate_quadrature(self.polynomial_order[n],
self.params_distribution[n],
rule=rule,
growth=self.growth)
self.xi_1d[n][self.polynomial_order[n]] = xi_i[0]
self.wi_1d[n][self.polynomial_order[n]] = wi_i
[docs] def check_max_quad_level(self):
"""
If a discrete variable is specified, there is the possibility of
non unique collocation points if the quadrature order is high enough.
This subroutine prevents that.
NOTE: Only detects cp.DiscreteUniform thus far
The max quad orders are stores in self.max_quad_order
Returns
-------
None
"""
# assume no maximum by default
self.max_level = np.ones(self.N) * 1000
for n in range(self.N):
# if a discrete uniform is specified check max order
if isinstance(self.params_distribution[n], cp.DiscreteUniform):
# if level one of the sparse grid is a midpoint rule, generate
# the quadrature with order 0 (1 quad point). Else set order at
# level 1 to 1
if self.midpoint_level1:
j = 0
else:
j = 1
number_of_points = 0
for order in range(1000):
xi_i, wi_i = cp.generate_quadrature(order + j,
self.params_distribution[n],
growth=self.growth)
# if the quadrature points no longer grow with the quad order,
# then the max order has been reached
if xi_i.size == number_of_points:
break
number_of_points = xi_i.size
logging.debug("Input %d is discrete, setting max quadrature order to %d"
% (n, order - 1))
# level 1 = order 0 etc
self.max_level[n] = order
[docs] def next_level_sparse_grid(self):
"""
Adds the points of the next level for isotropic hierarchical sparse grids.
Returns
-------
None.
"""
if self.nested is False:
print('Only works for nested sparse grids')
return
self.look_ahead(self.l_norm)
self.l_norm = np.append(self.l_norm, self.admissible_idx, axis=0)
[docs] def look_ahead(self, current_multi_idx):
"""
The look-ahead step in (dimension-adaptive) sparse grid sampling. Allows
for anisotropic sampling plans.
Computes the admissible forward neighbors with respect to the current level
multi-indices. The admissible level indices l are added to self.admissible_idx.
The code will be evaluated next iteration at the new collocation points
corresponding to the levels in admissble_idx.
Source: Gerstner, Griebel, "Numerical integration using sparse grids"
Parameters
----------
current_multi_idx : array of the levels in the current iteration
of the sparse grid.
Returns
-------
None.
"""
# compute all forward neighbors for every l in current_multi_idx
forward_neighbor = []
e_n = np.eye(self.N, dtype=int)
for l in current_multi_idx:
for n in range(self.N):
# the forward neighbor is l plus a unit vector
forward_neighbor.append(l + e_n[n])
# remove duplicates
forward_neighbor = np.unique(np.array(forward_neighbor), axis=0)
# remove those which are already in the grid
forward_neighbor = setdiff2d(forward_neighbor, current_multi_idx)
# make sure the final candidates are admissible (all backward neighbors
# must be in the current multi indices)
logging.debug('Computing admissible levels...')
admissible_idx = []
for l in forward_neighbor:
admissible = True
for n in range(self.N):
backward_neighbor = l - e_n[n]
# find backward_neighbor in current_multi_idx
idx = np.where((backward_neighbor == current_multi_idx).all(axis=1))[0]
# if backward neighbor is not in the current index set
# and it is 'on the interior' (contains no 0): not admissible
if idx.size == 0 and backward_neighbor[n] != 0:
admissible = False
break
# if all backward neighbors are in the current index set: l is admissible
if admissible:
admissible_idx.append(l)
logging.debug('done')
self.admissible_idx = np.array(admissible_idx)
# make sure that all entries of each index are <= the max quadrature order
# The max quad order can be low for discrete input variables
idx = np.where((self.admissible_idx <= self.max_level).all(axis=1))[0]
self.admissible_idx = self.admissible_idx[idx]
logging.debug('Admissible multi-indices:\n%s', self.admissible_idx)
# determine the maximum level L of the new index set L = |l| - N + 1
# self.L = np.max(np.sum(self.admissible_idx, axis=1) - self.N + 1)
self.L = np.max(self.admissible_idx)
# recompute the 1D weights and collocation points
self.compute_1D_points_weights(self.L, self.N)
# compute collocation grid based on the admissible level indices
admissible_grid = self.generate_grid(self.admissible_idx)
# remove collocation points which have already been computed
if not hasattr(self, 'xi_d'):
self.xi_d = self.generate_grid(self.admissible_idx)
self._n_samples = self.xi_d.shape[0]
new_points = setdiff2d(admissible_grid, self.xi_d)
logging.debug('%d new points added' % new_points.shape[0])
# keep track of the number of points added per iteration
if not hasattr(self, 'n_new_points'):
self.n_new_points = []
self.n_new_points.append(new_points.shape[0])
# update the number of samples
self._n_samples += new_points.shape[0]
# update the N-dimensional sparse grid if unsampled points are added
if new_points.shape[0] > 0:
self.xi_d = np.concatenate((self.xi_d, new_points))
# count the number of times the dimensions were adapted
self.nadaptations += 1
[docs] def is_finite(self):
return True
@property
def n_samples(self):
"""
Number of samples (Ns) of SC method.
- When using tensor quadrature: Ns = (p + 1)**d
- When using sparid: Ns = bigO((p + 1)*log(p + 1)**(d-1))
Where: p is the polynomial degree and d is the number of
uncertain parameters.
Ref: Eck et al. 'A guide to uncertainty quantification and
sensitivity analysis for cardiovascular applications' [2016].
"""
return self._n_samples
# SC collocations points are not random, generate_runs simply returns
# one collocation point from the tensor product after the other
def __next__(self):
if self.count < self._n_samples:
run_dict = {}
i_par = 0
for param_name in self.vary.get_keys():
run_dict[param_name] = self.xi_d[self.count][i_par]
i_par += 1
self.count += 1
return run_dict
else:
raise StopIteration
[docs] def save_state(self, filename):
logging.debug("Saving sampler state to %s" % filename)
file = open(filename, 'wb')
pickle.dump(self.__dict__, file)
file.close()
[docs] def load_state(self, filename):
logging.debug("Loading sampler state from %s" % filename)
file = open(filename, 'rb')
self.__dict__ = pickle.load(file)
file.close()
"""
=========================
(SPARSE) GRID SUBROUTINES
=========================
"""
[docs] def generate_grid(self, l_norm):
dimensions = range(self.N)
H_L_N = []
# loop over all multi indices i
for l in l_norm:
# compute the tensor product of nodes indexed by i
X_l = [self.xi_1d[n][l[n]] for n in dimensions]
H_L_N.append(list(product(*X_l)))
# flatten the list of lists
H_L_N = np.array(list(chain(*H_L_N)))
# return unique nodes
return np.unique(H_L_N, axis=0)
[docs] def compute_sparse_multi_idx(self, L, N):
"""
computes all N dimensional multi-indices l = (l1,...,lN) such that
|l| <= L + N - 1, i.e. a simplex set:
3 *
2 * * (L=3 and N=2)
1 * * *
1 2 3
Here |l| is the internal sum of i (l1+...+lN)
"""
# old implementation: does not scale well
# P = np.array(list(product(range(1, L + 1), repeat=N)))
# multi_idx = P[np.where(np.sum(P, axis=1) <= L + N - 1)[0]]
# use the look_ahead subroutine to build an isotropic sparse grid (faster)
multi_idx = np.array([np.ones(self.N, dtype='int')])
for l in range(self.L - 1):
self.look_ahead(multi_idx)
# accept all admissible indices to build an isotropic grid
multi_idx = np.append(multi_idx, self.admissible_idx, axis=0)
return multi_idx
[docs]def setdiff2d(X, Y):
"""
Computes the difference of two 2D arrays X and Y
Parameters
----------
X : 2D numpy array
Y : 2D numpy array
Returns
-------
The difference X \\ Y as a 2D array
"""
diff = set(map(tuple, X)) - set(map(tuple, Y))
return np.array(list(diff))