Source code for easyvvuq.sampling.simplex_stochastic_collocation

"""
-------------------------------------------------------------------------
THE SIMPLEX STOCASTIC COLLOCATION SAMPLER OF JEROEN WITTEVEEN (1980-2015)
-------------------------------------------------------------------------

Source:

Witteveen, J. A. S., & Iaccarino, G. (2013).
Simplex stochastic collocation with ENO-type stencil selection for robust
uncertainty quantification. Journal of Computational Physics, 239, 1-21.

Edeling, W. N., Dwight, R. P., & Cinnella, P. (2016).
Simplex-stochastic collocation method with improved scalability.
Journal of Computational Physics, 310, 301-328.

"""

from .base import BaseSamplingElement, Vary
# import chaospy as cp
import numpy as np
import pickle
from itertools import product, combinations
# import logging
from scipy.spatial import Delaunay
from scipy.special import factorial
import multiprocessing as mp


__author__ = "Wouter Edeling"
__copyright__ = """

    Copyright 2018 Robin A. Richardson, David W. Wright

    This file is part of EasyVVUQ

    EasyVVUQ is free software: you can redistribute it and/or modify
    it under the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    EasyVVUQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
__license__ = "LGPL"


[docs]class SSCSampler(BaseSamplingElement, sampler_name="ssc_sampler"): """ Simplex Stochastic Collocation sampler """ def __init__(self, vary=None, max_polynomial_order=4): """ Create an SSC sampler object. Parameters ---------- vary: dict or None keys = parameters to be sampled, values = distributions. max_polynomial_order : int, optional The maximum polynomial order, default is 4. Returns ------- None. """ # number of random inputs self.n_xi = len(vary) # vary dictionary of Chaospy input distribution self.vary = Vary(vary) # initial Delaunay triangulation self.tri = self.init_grid() # code sample counter self.count = 0 self._n_samples = self.tri.points.shape[0] self.set_pmax_cutoff(max_polynomial_order)
[docs] def init_grid(self): """ Create the initial n_xi-dimensional Delaunay discretization Returns ------- tri : scipy.spatial.qhull.Delaunay Initial triagulation of 2 ** n_xi + 1 points. """ # create inital hypercube points through n_xi dimensional # carthesian product of the lower and upper limits of the # chasopy input distributions corners = [[param.lower[0], param.upper[0]] for param in self.vary.get_values()] self.corners = corners xi_k_jl = np.array(list(product(*corners))) # add a point in the middle of the hypercube xi_k_jl = np.append(xi_k_jl, np.mean(xi_k_jl, axis=0).reshape([1, -1]), 0) if self.n_xi > 1: # Create initial Delaunay discretization """ NOTE: FOUND AN ERROR IN THE 'incremental' OPTION. IN A 3D CASE IT USES 4 VERTICES TO MAKE A SQUARE IN A PLANE, NOT A 3D SIMPLEX. TURNED IT OFF. CONSEQUENCE: I NEED TO RE-MAKE A NEW 'Delaunay' OBJECT EVERYTIME THE GRID IS REFINED. """ # tri = Delaunay(xi_k_jl, incremental=True) tri = Delaunay(xi_k_jl) else: tri = Tri1D(xi_k_jl) return tri
[docs] def find_pmax(self, n_s): """ Finds the maximum polynomial stencil order that can be sustained given the current number of code evaluations. The stencil size required for polynonial order p is given by; stencil size = (n_xi + p)! / (n_xi!p!) where n_xi is the number of random inputs. This formula is used to find p. Parameters ---------- n_xi : int Number of random parameters. n_s : int Number of code samples. Returns ------- int The highest polynomial order that can be sustained given the number of code evaluations. """ p = 1 stencil_size = factorial(self.n_xi + p) / (factorial(self.n_xi) * factorial(p)) while stencil_size <= n_s: p += 1 stencil_size = factorial(self.n_xi + p) / (factorial(self.n_xi) * factorial(p)) return p - 1
[docs] def set_pmax_cutoff(self, pmax_cutoff): """ Set the maximum allowed polynomial value of the surrogate. Parameters ---------- p_max_cutoff : int The max polynomial order. Returns ------- None. """ self.pmax_cutoff = pmax_cutoff self.i_norm_le_pj = self.compute_i_norm_le_pj(pmax_cutoff)
[docs] def compute_vol(self): """ Use analytic formula to compute the volume of all simplices. https://en.wikipedia.org/wiki/Simplex#Volume Returns ------- vols : array, size (n_e,) The volumes of the n_e simplices. """ n_e = self.tri.nsimplex vols = np.zeros(n_e) for j in range(n_e): xi_k_jl = self.tri.points[self.tri.simplices[j]] D = np.zeros([self.n_xi, self.n_xi]) for i in range(self.n_xi): D[:, i] = xi_k_jl[i, :] - xi_k_jl[-1, :] det_D = np.linalg.det(D) if det_D == 0: print('Warning: zero determinant in volume calculation.') vols[j] = 1. / factorial(self.n_xi) * np.abs(det_D) return vols
[docs] def compute_xi_center_j(self): """ Compute the center of all simplex elements. Returns ------- xi_center_j : array, size (n_e,) The cell centers of the n_e simplices. """ # number of simplex elements n_e = self.tri.nsimplex xi_center_j = np.zeros([n_e, self.n_xi]) for j in range(n_e): xi_center_j[j, :] = 1 / (self.n_xi + 1) * \ np.sum(self.tri.points[self.tri.simplices[j]], 0) return xi_center_j
[docs] def compute_sub_simplex_vertices(self, simplex_idx): """ Compute the vertices of the sub-simplex. The sub simplex is contained in the larger simplex. The larger simplex is refined by randomly placing a sample within the sub simplex. Parameters ---------- simplex_idx : int The index of the simplex Returns ------- xi_sub_jl : array, size (n_xi + 1, n_xi) The vertices of the sub simplex. """ xi_k_jl = self.tri.points[self.tri.simplices[simplex_idx]] xi_sub_jl = np.zeros([self.n_xi + 1, self.n_xi]) for l in range(self.n_xi + 1): idx = np.delete(range(self.n_xi + 1), l) xi_sub_jl[l, :] = np.sum(xi_k_jl[idx], 0) xi_sub_jl = xi_sub_jl / self.n_xi return xi_sub_jl
[docs] def sample_inputs(self, n_mc): """ Draw n_mc random values from the input distribution. Parameters ---------- n_mc : int The number of Monte Carlo samples. Returns ------- xi : array, shape(n_mc, n_xi) n_mc random samples from the n_xi input distributions. """ xi = np.zeros([n_mc, self.n_xi]) for i, param in enumerate(self.vary.get_values()): xi[:, i] = param.sample(n_mc) return xi
[docs] def compute_probability(self): """ Compute the probability Omega_j for all simplex elements; Omega_j = int p(xi) dxi, with integration separately over each simplex using Monte Carlo sampling. Returns ------- prob_j : array, size (n_e,) The probabilities of each simplex. """ n_e = self.tri.nsimplex print('Computing simplex probabilities...') prob_j = np.zeros(n_e) # number of MC samples n_mc = np.min([10 ** (self.n_xi + 3), 10 ** 7]) xi = self.sample_inputs(n_mc) # find the simplix indices of xi idx = self.tri.find_simplex(xi) # compute simplex probabolities for i in range(n_mc): prob_j[idx[i]] += 1 prob_j = prob_j / np.double(n_mc) print('done, used ' + str(n_mc) + ' samples.') zero_idx = (prob_j == 0).nonzero()[0].size print('% of simplices with no samples = ' + str((100.0 * zero_idx) / n_e)) return prob_j
[docs] def compute_i_norm_le_pj(self, p_max): """ Compute the multi-index set {i | |i| = i_1 + ... + i_{n_xi} <= p}, for p = 1,...,p_max Parameters ---------- p_max : int The max polynomial order of the index set. Returns ------- i_norm_le_pj : dict The Np + 1 multi indices per polynomial order. """ i_norm_le_pj = {} for p in range(1, p_max + 1): # max(i_1, i_2, ...i _{n_xi}) <= p multi_idx = np.array(list(product(range(p + 1), repeat=self.n_xi))) # i_1 + i_2 <= N idx = np.where(np.sum(multi_idx, axis=1) <= p)[0] multi_idx = multi_idx[idx] i_norm_le_pj[p] = multi_idx return i_norm_le_pj
[docs] def compute_Psi(self, xi_Sj, pmax): """ Compute the Vandermonde matrix Psi, given N + 1 points xi from the j-th interpolation stencil, and a multi-index set of polynomial orders |i| = i_1 + ... + i_n_xi <= polynomial order. Parameters ---------- xi_Sj : array, shape (N + 1, n_xi) The simplex n_xi-dimensional points of the j-th interpolation stencil S_j. pmax : int The max polynomial order of the local stencil. Returns ------- Psi : array, shape (N + 1, N + 1) The Vandermonde interpolation matrix consisting of monomials xi_1 ** i_1 + ... + xi_{n_xi} ** i_{n_xi}. """ Np1 = xi_Sj.shape[0] Psi = np.ones([Np1, Np1]) for row in range(Np1): for col in range(Np1): for j in range(self.n_xi): # compute monomial xi_1 ** i_1 + ... + xi_{n_xi} ** i_{n_xi} Psi[row, col] *= xi_Sj[row][j] ** self.i_norm_le_pj[pmax][col][j] return Psi
[docs] def w_j(self, xi, c_jl, pmax): """ Compute the surrogate local interpolation at point xi. # TODO: right now this assumes a scalar output. Modify the code for vector-valued outputs. Parameters ---------- xi : array, shape (n_xi,) A point inside the stochastic input domain. c_jl : array, shape (N + 1,) The interpolation coefficients of the j-th stencil, with l = 0, ..., N. pmax : int The max polynomial order of the local stencil. Returns ------- w_j_at_xi : float The surrogate prediction at xi. """ # number of points in the j-th interpolation stencil Np1 = c_jl.shape[0] # vector of interpolation monomials Psi_xi = np.ones([Np1, 1]) for i in range(Np1): for j in range(self.n_xi): # take the power of xi to multi index entries Psi_xi[i] *= xi[j] ** self.i_norm_le_pj[pmax][i][j] # surrogate prediction w_j_at_xi = np.sum(c_jl * Psi_xi) return w_j_at_xi
[docs] def check_LEC(self, p_j, v, S_j, n_mc, max_jobs=4): """ Check the Local Extremum Conserving propery of all simplex elements. Parameters ---------- p_j : array, shape (n_e,) The polynomial order of each element. v : array, shape (N + 1,) The (scalar) code outputs. #TODO:modify when vectors are allowed S_j : array, shape (n_e, n_s) The indices of all nearest neighbours points of each simplex j=1,..,n_e, ordered from closest to the neighbour that furthest away. The first n_xi + 1 indeces belong to the j-th simplex itself. n_mc : int The number of Monte Carlo samples to use in checking the LEC conditions. max_jobs : int The number of LEC check (one per element) that can be run in parallel. Returns ------- None. """ n_e = self.tri.nsimplex print('Checking LEC condition of ' + str(n_e) + ' stencils...') # multi-processing pool which will hold the check_LEC_j jobs jobs = [] queues = [] running_jobs = 0 j = 0 n_jobs = n_e el_idx = {} # r = np.array(list(product([0.25, 0.75], repeat=self.n_xi))) while n_jobs > 0: # check how many processes are still alive for p in jobs: if p.is_alive() == False: jobs.remove(p) # number of processes still running running_jobs = len(jobs) # re-fill jobs with max max_jobs processes while running_jobs < max_jobs and n_jobs > 0: queue = mp.Queue() prcs = mp.Process(target=self.check_LEC_j, args=(p_j[j], v, S_j[j, :], n_mc, queue)) prcs.start() running_jobs += 1 n_jobs -= 1 j += 1 jobs.append(prcs) queues.append(queue) # retrieve results for j in range(n_e): # jobs[j].join() tmp = queues[j].get() p_j[j] = tmp['p_j[j]'] el_idx[j] = tmp['el_idx_j'] # singular_idx = (p_j == -99).nonzero()[0] # # jobs = [] # queues = [] # n_jobs = singular_idx.size # # if singular_idx.size > 0: # nnS_j = compute_stencil_j(tri) # # k = 0 # while n_jobs > 0: # # #check how many processes are still alive # for p in jobs: # if p.is_alive() == False: # jobs.remove(p) # # #number of processes still running # running_jobs = len(jobs) # # while running_jobs < max_jobs and n_jobs > 0: # #print 'Re-computing S_j for j = ' + str(j) # queue = mp.Queue() # #S_j[j,:], p_j[j], el_idx[j] = \ # j = singular_idx[k] # k += 1 # prcs = mp.Process(target=non_singular_stencil_j, \ # args = (tri, p_j, S_j, j, nnS_j, i_norm_le_pj, el_idx, queue,)) # prcs.start() # jobs.append(prcs) # queues.append(queue) # # running_jobs += 1 # n_jobs -= 1 # # #retrieve results # idx = 0 # for j in singular_idx: # #jobs[idx].join() # tmp = queues[idx].get() # S_j[j,:] = tmp['S_j[j,:]'] # p_j[j] = tmp['p_j[j]'] # el_idx[j] = tmp['el_idx[j]'] # idx += 1 print('done.') return {'p_j': p_j, 'S_j': S_j, 'el_idx': el_idx}
[docs] def find_simplices(self, S_j): """ Find the simplex element indices that are in point stencil S_j. Parameters ---------- S_j : array, shape (N + 1,) The interpolation stencil of the j-th simplex element, expressed as the indices of the simplex points. Returns ------- idx : array The element indices of stencil S_j. """ n_e = self.tri.nsimplex # if the overlap between element i and S_j = n_xi + 1, element i # is in S_j idx = [np.in1d(self.tri.simplices[i], S_j).nonzero()[0].size for i in range(n_e)] idx = (np.array(idx) == self.n_xi + 1).nonzero()[0] return idx
[docs] def check_LEC_j(self, p_j, v, S_j, n_mc, queue): """ Check the LEC conditin of the j-th interpolation stencil. Parameters ---------- p_j : int The polynomial order of the j-th stencil. v : array The code samples. S_j : array, shape (N + 1,) The interpolation stencil of the j-th simplex element, expressed as the indices of the simplex points. n_mc : int The number of Monte Carlo samples to use in checking the LEC conditions. queue : multiprocessing queue object Used to store the results. Returns ------- None, results (polynomial order and element indices are stored in the queue) """ n_xi = self.n_xi # n_e = self.tri.nsimplex N = v[0, :].size # the number of points in S_j Np1_j = int(factorial(n_xi + p_j) / (factorial(n_xi) * factorial(p_j))) # select the vertices of stencil S_j xi_Sj = self.tri.points[S_j[0:Np1_j]] # find the corresponding indices of v v_Sj = v[S_j[0:Np1_j], :] # the element indices of the simplices in stencil S_j el_idx_j = self.find_simplices(S_j[0:Np1_j]) # compute sample matrix Psi = self.compute_Psi(xi_Sj, p_j) # check if Psi is well poised # det_Psi = np.linalg.det(Psi) # if det_Psi == 0: # #print 'Warning: determinant Psi is zero.' # #print 'Reducing local p_j from ' + str(p_j[j]) + ' to a lower value.' # #return an error code # return queue.put({'p_j[j]':-99, 'el_idx_j':el_idx_j}) # compute the coefficients c_jl # c_jl = np.linalg.solve(Psi, v_Sj) c_jl = DAFSILAS(Psi, v_Sj) # check the LEC condition for all simplices in the STENCIL S_j k = 0 LEC_checked = False while LEC_checked == False and p_j != 1: # sample the simplices in stencil S_j xi_samples = self.sample_simplex(n_mc, self.tri.points[self.tri.simplices[el_idx_j[k]]]) # function values at the edges of the k-th simplex in S_j v_min = np.min(v[self.tri.simplices[el_idx_j[k]]]) v_max = np.max(v[self.tri.simplices[el_idx_j[k]]]) # compute interpolation values at MC sample points w_j_at_xi = np.zeros([n_mc, N]) for i in range(n_mc): w_j_at_xi[i, :] = self.w_j(xi_samples[i], c_jl, p_j) k += 1 # if LEC is violated in any of the simplices # TODO: make work for vector-valued outputs eps = 0 if (w_j_at_xi.min() <= v_min - eps or w_j_at_xi.max() >= v_max + eps) and p_j > 1: p_j -= 1 # the new number of points in S_j Np1_j = int(factorial(n_xi + p_j) / (factorial(n_xi) * factorial(p_j))) # select the new vertices of stencil S_j xi_Sj = self.tri.points[S_j[0:Np1_j]] # find the new corresponding indices of v v_Sj = v[S_j[0:Np1_j], :] # the new element indices of the simplices in stencil S_j el_idx_j = self.find_simplices(S_j[0:Np1_j]) k = 0 if p_j == 1: return queue.put({'p_j[j]': p_j, 'el_idx_j': el_idx_j}) # recompute sample matrix Psi = self.compute_Psi(xi_Sj, p_j) # check if Psi is well poised # det_Psi = np.linalg.det(Psi) # if det_Psi == 0: # #print 'Warning: determinant Psi is zero.' # #print 'Reducing local p_j from ' + str(p_j[j]) + ' to a lower value.' # #return an error code # return queue.put({'p_j[j]':-99, 'el_idx_j':el_idx_j}) # compute the coefficients c_jl # c_jl = np.linalg.solve(Psi, v_Sj) c_jl = DAFSILAS(Psi, v_Sj, False) if k == el_idx_j.size: LEC_checked = True queue.put({'p_j[j]': p_j, 'el_idx_j': el_idx_j})
[docs] def compute_stencil_j(self): """ Compute the nearest neighbor stencils of all simplex elements. The distance to all points are measured with respect to the cell center of each element. Returns ------- S_j : array, shape (n_e, n_s) The indices of all nearest neighbours points of each simplex j=1,..,n_e, ordered from closest to the neighbour that furthest away. The first n_xi + 1 indeces belong to the j-th simplex itself. """ n_e = self.tri.nsimplex n_s = self.tri.npoints n_xi = self.n_xi S_j = np.zeros([n_e, n_s]) # compute the center of each element xi_center_j = self.compute_xi_center_j() for j in range(n_e): # the number of points in S_j # Np1_j = factorial(n_xi + p_j[j])/(factorial(n_xi)*factorial(p_j[j])) # k = {1,...,n_s}\{k_j0, ..., k_jn_xi} idx = np.delete(range(n_s), self.tri.simplices[j]) # store the vertex indices of the element itself S_j[j, 0:n_xi + 1] = np.copy(self.tri.simplices[j]) # loop over the set k dist = np.zeros(n_s - n_xi - 1) for i in range(n_s - n_xi - 1): # ||xi_k - xi_center_j|| dist[i] = np.linalg.norm(self.tri.points[idx[i]] - xi_center_j[j, :]) # store the indices of the points, sorted based on distance wrt # simplex center j. Store only the amount of indices allowed by p_j. S_j[j, n_xi + 1:] = idx[np.argsort(dist)] return S_j.astype('int')
[docs] def compute_ENO_stencil(self, p_j, S_j, el_idx, max_jobs=4): """ Compute the Essentially Non-Oscillatory stencils. The idea behind ENO stencils is to have higher degree interpolation stencils up to a thin layer of simplices containing the discontinuity. For a given simplex, its ENO stencil is created by locating all the nearest-neighbor stencils that contain element j , and subsequently selecting the one with the highest polynomial order p_j . This leads to a Delaunay triangulation which captures the discontinuity better than its nearest-neighbor counterpart. Parameters ---------- p_j : array, shape (n_e,) The polynomial order of each simplex element. S_j : array, shape (n_e, n_s) The indices of all nearest neighbours points of each simplex j=1,..,n_e, ordered from closest to the neighbour that furthest away. The first n_xi + 1 indeces belong to the j-th simplex itself. el_idx : dict The element indices for each interpolation stencil. el_idx[2] gives the elements indices of the 3rd interpolation stencil. The number of elements is determined by the local polynomial order. max_jobs : int, optional The number of ENO stencils that are computed in parallel. The default is 4. Returns ------- ENO_S_j : array, shape (n_e, n_s) The ENO stencils for each element. p_j : array, shape (n_e,) The new polynomial order of each element. el_idx : dict The new element indices for each interpolation stencil. """ n_e = self.tri.nsimplex n_s = self.tri.npoints # array to store the ENO stencils ENO_S_j = np.zeros([n_e, n_s]).astype('int') # the center of each simplex xi_centers = self.compute_xi_center_j() jobs = [] queues = [] running_jobs = 0 j = 0 n_jobs = n_e print('Computing ENO stencils...') while n_jobs > 0: # check how many processes are still alive for p in jobs: if p.is_alive() == False: jobs.remove(p) # number of processes still running running_jobs = len(jobs) # compute the ENO stencils (in parallel) while running_jobs < max_jobs and n_jobs > 0: queue = mp.Queue() prcs = mp.Process(target=self.compute_ENO_stencil_j, args=(p_j, S_j, xi_centers, j, el_idx, queue)) # print check_LEC_j(tri, p_j, v, S_j, n_mc, j) jobs.append(prcs) queues.append(queue) prcs.start() n_jobs -= 1 running_jobs += 1 j += 1 # retrieve results for j in range(n_e): tmp = queues[j].get() ENO_S_j[j, :] = tmp['ENO_S_j'] p_j[j] = tmp['p_j_new'] el_idx[j] = tmp['el_idx[j]'] print('done.') return ENO_S_j, p_j, el_idx
[docs] def compute_ENO_stencil_j(self, p_j, S_j, xi_centers, j, el_idx, queue): """ Compute the ENO stencil of the j-th element. Parameters ---------- p_j : array, shape (n_e,) The polynomial order of each simplex element. S_j : array, shape (n_e, n_s) The indices of all nearest neighbours points of each simplex j=1,..,n_e, ordered from closest to the neighbour that furthest away. The first n_xi + 1 indeces belong to the j-th simplex itself. xi_centers : array, shape (n_e, ) The center of each simplex. j : int The index of the current simplex. el_idx : dict The element indices for each interpolation stencil. el_idx[2] gives the elements indices of the 3rd interpolation stencil. The number of elements is determined by the local polynomial order. queue : multiprocessing queue object Used to store the results. Returns ------- None, results (polynomial order and element indices are stored in the queue) """ n_e = self.tri.nsimplex # set the stencil to the nearest neighbor stencil ENO_S_j = np.copy(S_j[j, :]) p_j_new = np.copy(p_j[j]) # loop to find alternative candidate stencils S_ji with p_j>1 that contain k_jl idx = (p_j == 1).nonzero()[0] all_with_pj_gt_1 = np.delete(range(n_e), idx) for i in all_with_pj_gt_1: # found a candidate stencil S_ji if np.in1d(j, el_idx[i]): # the candidate stencil has a higher polynomial degree: accept if p_j[i] > p_j_new: ENO_S_j = np.copy(S_j[i, :]) p_j_new = np.copy(p_j[i]) el_idx[j] = np.copy(el_idx[i]) # the candidate stencil has the same polynomial degree: accept # the one with smallest avg Euclidian distance to the cell center elif p_j_new == p_j[i]: dist_i = np.linalg.norm(xi_centers[el_idx[i]] - xi_centers[j], 2, axis=1) dist_j = np.linalg.norm(xi_centers[el_idx[j]] - xi_centers[j], 2, axis=1) # if the new distance is smaller than the old one: accept if np.sum(dist_i) < np.sum(dist_j): ENO_S_j = np.copy(S_j[i, :]) el_idx[j] = np.copy(el_idx[i]) queue.put({'ENO_S_j': ENO_S_j, 'p_j_new': p_j_new, 'el_idx[j]': np.copy(el_idx[j])})
[docs] def sample_simplex(self, n_mc, xi_k_jl, check=False): """ Use an analytical map from n_mc uniformly distributed points in the n_xi-dimensional hypercube, to uniformly distributed points in the target simplex described by the nodes xi_k_jl. Derivation: Edeling, W. N., Dwight, R. P., & Cinnella, P. (2016). Simplex-stochastic collocation method with improved scalability. Journal of Computational Physics, 310, 301-328. Parameters ---------- n_mc : int The number of Monte Carlo samples. xi_k_jl : array, shape (n_xi + 1, n_xi) The nodes of the target simplex. check : bool, optional Check is the random samples actually all lie insiide the target simplex. The default is False. Returns ------- P : array, shape (n_mc, n_xi) Uniformly distributed points inside the target simplex. """ n_xi = self.n_xi P = np.zeros([n_mc, n_xi]) for k in range(n_mc): # random points inside the hypercube r = np.random.rand(n_xi) # the term of the map is \xi_k_j0 sample = np.copy(xi_k_jl[0]) for i in range(1, n_xi + 1): prod_r = 1. # compute the product of r-terms: prod(r_{n_xi-j+1}^{1/(n_xi-j+1)}) for j in range(1, i + 1): prod_r *= r[n_xi - j]**(1. / (n_xi - j + 1)) # compute the ith term of the sum: prod_r*(\xi_i-\xi_{i-1}) sample += prod_r * (xi_k_jl[i] - xi_k_jl[i - 1]) P[k, :] = sample # check if any of the samples are outside the simplex if check: outside_simplex = 0 avg = np.sum(xi_k_jl, 0) / (n_xi + 1.) el = self.tri.find_simplex(avg) for i in range(n_mc): if self.tri.find_simplex(P[i, :]) != el: outside_simplex += 1 print('Number of samples outside target simplex = ' + str(outside_simplex)) return P
[docs] def sample_simplex_edge(self, simplex_idx, refined_edges): """ Refine the longest edge of a simplex. # TODO: is allright for 2D, but does this make sense in higher dims? Parameters ---------- simplex_idx : int The index of the simplex. refined_edges : list Contains the pairs of the point indices, corresponding to edges that have been refined in the current iteration. Simplices share edges, and this list is used to prevent refining the same edge twice within the same iteration of the SSC algorihm. Returns ------- xi_new : array, shape (n_xi,) The newly added point (along the longest edge). refined_edges : list The updated refined_edges list. already_refined : bool Returns True if the edge already has been refined. """ # the point indices of the simplex selected for refinement simplex_point_idx = self.tri.simplices[simplex_idx] # the points of the simplex selected for refinement xi_k_jl = self.tri.points[simplex_point_idx] # find the indices of all edges, i.e. the combination of all possible # 2 distinct elements from range(n_xi + 1) comb = list(combinations(range(self.n_xi + 1), 2)) # compute all edge lengths, select the largest edge_lengths = np.zeros(len(comb)) for i in range(len(comb)): edge_lengths[i] = np.linalg.norm(xi_k_jl[comb[i][1], :] - xi_k_jl[comb[i][0], :]) idx = np.argmax(edge_lengths) # if there are 2 or more edge lengths that are the same, select the 1st if idx.size > 1: idx = idx[0] # edge points xi_0 = xi_k_jl[comb[idx][0], :] xi_1 = xi_k_jl[comb[idx][1], :] # simplices share edges, make sure it was not already refined during # this iteration current_edge = np.array([simplex_point_idx[comb[idx][0]], simplex_point_idx[comb[idx][1]]]) already_refined = False for edge in refined_edges: if set(edge) == set(current_edge): already_refined = True if not already_refined: refined_edges.append(current_edge) # place random sample at +/- 10% of edge center b = 0.6 a = 0.4 U = np.random.rand() * (b - a) + a xi_new = xi_0 + U * (xi_1 - xi_0) return xi_new, refined_edges, already_refined
[docs] def compute_surplus_k(self, xi_k_jref, S_j, p_j, v, v_k_jref): """ Compute the hierachical surplus at xi_k_jref (the refinement location), defined as the difference between the new code sample and the (old) surrogate prediction at the refinement location. Parameters ---------- xi_k_jref : array, shape (n_xi,) The refinement location. S_j : array, shape (n_e, n_s) The indices of all nearest neighbours points of each simplex j=1,..,n_e, ordered from closest to the neighbour that furthest away. The first n_xi + 1 indeces belong to the j-th simplex itself. p_j : array, shape (n_e,) The polynomial order of each simplex element. v : array, shape (N + 1,) The (scalar) code outputs. #TODO:modify when vectors are allowed v_k_jref : float #TODO:modify when vectors are allowed The code prediction at the refinement location. Returns ------- surplus : float #TODO:modify when vectors are allowed The hierarchical suplus """ w_k_jref = self.surrogate(xi_k_jref, S_j, p_j, v) # compute the hierarcical surplus between old interpolation and new v value surplus = w_k_jref - v_k_jref return surplus
[docs] def surrogate(self, xi, S_j, p_j, v): """ Evaluate the SSC surrogate at xi. Parameters ---------- xi : array, shape (n_xi,) The location in the input space at which to evaluate the surrogate. S_j : array, shape (n_e, n_s) The indices of all nearest neighbours points of each simplex j=1,..,n_e, ordered from closest to the neighbour that furthest away. The first n_xi + 1 indeces belong to the j-th simplex itself. p_j : array, shape (n_e,) The polynomial order of each simplex element. v : array, shape (N + 1,) The (scalar) code outputs. #TODO:modify when vectors are allowed Returns ------- None. """ n_xi = self.n_xi idx = int(self.tri.find_simplex(xi)) # the number of points in S_j Np1_j = int(factorial(n_xi + p_j[idx]) / (factorial(n_xi) * factorial(p_j[idx]))) # the vertices of the stencil S_j xi_Sj = self.tri.points[S_j[idx, 0:Np1_j]] # find the corresponding indices of v v_Sj = v[S_j[idx, 0:Np1_j], :] # compute sample matrix Psi = self.compute_Psi(xi_Sj, p_j[idx]) # #check if Psi is well poised, at this point all stencils should be well-poised # det_Psi = np.linalg.det(Psi) # if det_Psi == 0: # print 'Error, det(Psi)=0 in compute_surplus_k() method, should not be possible' # compute the coefficients c_jl # c_jl = np.linalg.solve(Psi, v_Sj) c_jl = DAFSILAS(Psi, v_Sj, False) # compute the interpolation on the old grid w_j = self.w_j(xi, c_jl, p_j[idx]) return w_j
[docs] def compute_eps_bar_j(self, p_j, prob_j): """ Compute the geometric refinement measure \bar{\\eps}_j for all elements, Elements with the largest values will be selected for refinement. Parameters ---------- p_j : array, shape (n_e,) The polynomial order of each simplex element. prob_j : array, shape (n_e,) The probability of each simplex element. Returns ------- eps_bar_j : array, shape (n_e,) The geometric refinement measure. vol_j : array, shape (n_e,) The volume of each simplex element. """ n_e = self.tri.nsimplex eps_bar_j = np.zeros(n_e) n_xi = self.tri.ndim vol_j = self.compute_vol() vol = np.sum(vol_j) for j in range(n_e): O_j = (p_j[j] + 1.0) / n_xi eps_bar_j[j] = prob_j[j] * (vol_j[j] / vol) ** (2 * O_j) return eps_bar_j, vol_j
[docs] def find_boundary_simplices(self): """ Find the simplices that are on the boundary of the hypercube . Returns ------- idx : array Indices of the boundary simplices. """ idx = (self.tri.neighbors == -1).nonzero()[0] return idx
[docs] def update_Delaunay(self, new_points): """ Update the Delaunay triangulation with P new points. Parameters ---------- new_points : array, shape (P, n_xi) P new n_xi-dimensional points. Returns ------- None. """ xi_k_jl = np.append(self.tri.points, new_points, 0) if self.n_xi > 1: self.tri = Delaunay(xi_k_jl) else: self.tri = Tri1D(xi_k_jl) self._n_samples = self.tri.npoints
[docs] def get_Delaunay(self): """ Return the SciPy Delaunay triangulation. """ return self.tri
[docs] def is_finite(self): return True
@property def n_samples(self): """ Returns the number of samples code samples. """ return self._n_samples @property def n_dimensions(self): """ Returns the number of uncertain random variables """ return self.n_xi @property def n_elements(self): """ Returns the number of simplex elements """ return self.tri.nsimplex def __next__(self): if self.count < self._n_samples: run_dict = {} i_par = 0 for param_name in self.vary.get_keys(): run_dict[param_name] = self.tri.points[self.count][i_par] i_par += 1 self.count += 1 return run_dict else: raise StopIteration
[docs] def save_state(self, filename): """ Save the state of the sampler to a pickle file. Parameters ---------- filename : string File name. Returns ------- None. """ print("Saving sampler state to %s" % filename) with open(filename, 'wb') as fp: pickle.dump(self.__dict__, fp)
[docs] def load_state(self, filename): """ Load the state of the sampler from a pickle file. Parameters ---------- filename : string File name. Returns ------- None. """ print("Loading sampler state from %s" % filename) with open(filename, 'rb') as fp: self.__dict__ = pickle.load(fp)
[docs]def DAFSILAS(A, b, print_message=False): """ Direct Algorithm For Solving Ill-conditioned Linear Algebraic Systems, solves the linear system when Ax = b when A is ill conditioned. Solves for x in the non-null subspace of the solution as described in the reference below. This method utilizes Gauss–Jordan elimination with complete pivoting to identify the null subspace of a (almost) singular matrix. X. J. Xue, Kozaczek, K. J., Kurtzl, S. K., & Kurtz, D. S. (2000). A direct algorithm for solving ill-conditioned linear algebraic systems. Adv. X-Ray Anal, 42. """ # The matrix A' as defined in Xue b = b.reshape(b.size) Ap = np.zeros([A.shape[0], 2 * A.shape[0] + 1]) Ap[:, 0:A.shape[0]] = np.copy(A) Ap[:, A.shape[0]] = np.copy(b) Ap[:, A.shape[0] + 1:] = np.eye(A.shape[0]) n, m = Ap.shape # permutation matrix P = np.eye(n) # the ill-condition control parameter # epsilon = np.finfo(np.float64).eps epsilon = 10**-14 for i in range(n - 1): # calc sub matrix Ai Ai = np.copy(Ap[i:n, i:n]) # find the complete pivot in sub matrix Ai api = np.max(np.abs(Ai)) if api == 0: break # find the location of the complete pivot in Ai row, col = np.unravel_index(np.abs(Ai).argmax(), Ai.shape) # interchange rows and columns to exchange position of api and aii tmp = np.copy(Ap[i, :]) Ap[i, :] = np.copy(Ap[i + row, :]) Ap[i + row, :] = tmp tmp = np.copy(Ap[:, i]) Ap[:, i] = np.copy(Ap[:, i + col]) Ap[:, i + col] = tmp # Also interchange the entries in b # tmp = A[i, n] # A[i, n] = A[i+col, n]Ap[i+1+j, i:m] # A[i+col, n] = tmp # keep track of column switches via a series of permuation matrices P = # P1*P2*...*Pi*...*Pn ==> at each iteration x = P*xi Pi = np.eye(n) tmp = np.copy(Pi[i, :]) Pi[i, :] = np.copy(Pi[i + col, :]) Pi[i + col, :] = tmp P = np.dot(P, Pi) # Calculate multipliers if Ai[row, col] < 0: api = api * -1. # sign is important in multipliers M = Ap[i + 1:n, i] / np.double(api) # start row reduction for j in range(M.size): Ap[i + 1 + j, i:m] = Ap[i + 1 + j, i:m] - M[j] * Ap[i, i:m] # the largest complete pivot eta = np.max(np.abs(np.diag(Ap))) * 1.0 # test if |aii/nc| <= epsilon idx = (np.abs(np.diag(Ap) / eta) <= epsilon).nonzero()[0] # Perform zeroing operation if necessary if idx.size > 0: nullity = idx.size Arange = Ap[0:n - nullity, 0:n - nullity] if print_message: print('Matrix is ill-conditioned, performing zeroing operation') print('nullity = ' + str(nullity) + ', rank = ' + str(n - nullity) + ', cond(A) = ' + str(np.linalg.cond(A)) + ', cond(Arange) = ' + str(np.linalg.cond(Arange))) # ajj = 1, aij = 0 for j = i...n Ap[idx[0]:n, idx[0]:n] = np.eye(nullity) # bj = 0 Ap[idx[0]:n, n] = 0 # ejj = 1, eij = 0 Ap[idx[0]:n, idx[0] + n + 1:m] = np.eye(nullity) # Back substitution for i in range(n, 0, -1): Ai = Ap[0:i, :] # Calculate multipliers M = Ai[0:i - 1, i - 1] / np.double(Ai[i - 1, i - 1]) # start back substitution for j in range(M.size): Ai[j, :] = Ai[j, :] - M[j] * Ai[i - 1, :] # store result in A Ap[0:i, :] = Ai # turn A into eye(n) D = (1. / np.diag(Ap)).reshape([n, 1]) Ap = np.multiply(D, Ap) # Calculated solution return np.dot(P, Ap[:, n]).reshape([n, 1])
[docs]class Tri1D: """ 1D "triangulation" that mimics the following SciPy Delaunay properties: * ndim * points * npoints * nsimplex * simplices * neighbours * the find_simplex subroutine """ def __init__(self, points): """ Create a 1D Triangulation object that can we used by the SSCSampler in the same way as the SciPy Delaunay triangulation. Parameters ---------- points : array, shape (n_s, ) A 1D array of nodes. Returns ------- None. """ self.ndim = 1 self.points = points self.npoints = points.size self.nsimplex = points.size - 1 points_sorted = np.sort(self.points.reshape(self.npoints)) self.simplices = np.zeros([self.nsimplex, 2]) for i in range(self.nsimplex): self.simplices[i, 0] = (self.points == points_sorted[i]).nonzero()[0] self.simplices[i, 1] = (self.points == points_sorted[i + 1]).nonzero()[0] self.simplices = self.simplices.astype('int') self.neighbors = np.zeros([self.nsimplex, 2]) for i in range(self.nsimplex): self.neighbors[i, 0] = i - 1 self.neighbors[i, 1] = i + 1 self.neighbors[-1, 1] = -1 self.neighbors = self.neighbors.astype('int')
[docs] def find_simplex(self, xi): """ Find the simplex indices of nodes xi Parameters ---------- xi : array, shape (S,) An array if S 1D points. Returns ------- array An array containing the simplex indices of points xi. """ Idx = np.zeros(xi.shape[0]) points_sorted = np.sort(self.points.reshape(self.npoints)) for i in range(xi.shape[0]): idx = (points_sorted < xi[i]).argmin() - 1 # if xi = 0 idx will be -1 if idx == -1: Idx[i] = 0 else: Idx[i] = idx return Idx.astype('int')