Source code for ezyrb.plugin.aggregation

from .plugin import Plugin
import numpy as np
from ..approximation.rbf import RBF
from ..approximation.linear import Linear
from ..database import Database
import copy
from scipy.optimize import minimize


[docs] class Aggregation(Plugin): """ The `Aggregation` plugin class. It implements a general framework for space-dependent aggregation of multiple reduced order models (ROMs). The aggregation weights can be computed either through standard space-dependent methods (e.g., Gaussian functions) or by fitting a regression/interpolation model (e.g., RBF, linear, ANN) on the validation set. Reference: Ivagnes A., Tonicello N., Rozza G. (2024). `Enhancing non-intrusive reduced-order models with space-dependent aggregation methods`, Acta Mechanica. :param fit_function: a regression/interpolation model to fit the weights in the validation set. If None, standard space-dependent aggregation methods are used (default is None). :param predict_function: a regression/interpolation model to predict the weights in the test set. Default is a linear interpolation. Example: >>> from ezyrb import POD, RBF, GPR, Database, Snapshot, Parameter, Linear, ANN >>> from ezyrb import ReducedOrderModel as ROM >>> from ezyrb.plugin import Aggregation >>> pod = POD(rank=10) >>> rbf = RBF() >>> gpr = GPR() >>> db = Database(param, snapshots.T) >>> rom1 = ROM(db, pod, rbf) >>> rom2 = ROM(db, pod, gpr) >>> mrom = MROM({'rbf': rom1, 'gpr': rom2}, plugins=[Aggregation( ... fit_function=None, predict_function=Linear())]) >>> mrom.fit() >>> pred = mrom.predict(new_param) """ def __init__(self, fit_function=None, predict_function=Linear()): """ Initialize the Aggregation plugin. :param fit_function: Regression model to fit weights in validation set. If None, uses standard space-dependent methods. Default is None. :param predict_function: Regression model to predict weights in test set. Default is Linear(). """ super().__init__() self.fit_function = fit_function self.predict_function = predict_function
[docs] def _check_sum_gaussians(self, mrom, sum_gaussians, gaussians): """ Method to check if the sum of the Gaussian functions is on-zero in all the points of the validation set. If there are zero values, the same weight/gaussian is taken for all the ROMs at those points. :param mrom: the multi reduced order model. :param sum_gaussians: the sum of the Gaussian functions. :param gaussians: the Gaussian functions. :return: the corrected Gaussian functions. """ zero_values = sum_gaussians < 1e-5 # tolerance to avoid numerical issues if zero_values.any(): gaussians[:, zero_values] = 1/len(mrom.roms) # equal weights return gaussians
[docs] def _optimize_sigma(self, mrom, sigma_range=[1e-5, 1e-2]): """ Method to optimize the sigma parameter in the Gaussian functions through a minimization procedure on the validation set. :param mrom: the multi reduced order model. :param sigma_range: the range of values to search for the optimal sigma. :return: the optimal sigma value. """ def obj_func(sigma): # compute test error of multiROM from current sigma g_sigma = self._compute_validation_weights(mrom, sigma, normalized=True) mrom_prediction = np.zeros_like(mrom.validation_full_database.snapshots_matrix) for i, rom in enumerate(mrom.roms): mrom_prediction += g_sigma[i, ...] * mrom.roms[rom].predict(mrom.validation_full_database).snapshots_matrix test_error = np.mean( np.linalg.norm(mrom_prediction - mrom.validation_full_database.snapshots_matrix, axis=1) / np.linalg.norm(mrom.validation_full_database.snapshots_matrix, axis=1)) return test_error # minimization procedure res = minimize(obj_func, x0=sigma_range[0], method="L-BFGS-B", bounds=[sigma_range]) print('Optimal sigma value in weights: ', res.x) return res.x
[docs] def _compute_validation_weights(self, mrom, sigma, normalized=False): """ Method to compute the weights in the validation set through standard space-dependent aggregation methods. :param mrom: the multi reduced order model. :param sigma: the sigma parameter in the Gaussian functions. :param normalized: if True, the weights are normalized to sum to 1 at each spatial point (default is False). :return: the weights in the validation set. """ validation_predicted = dict() for name, rom in mrom.roms.items(): validation_predicted[name] = rom.predict(rom.validation_full_database).snapshots_matrix g = {} for k, v in validation_predicted.items(): snap = rom.validation_full_database.snapshots_matrix g[k] = np.exp(- ((v - snap)**2)/(2 * (sigma**2))) g_tensor = np.array([g[k] for k in g.keys()]) g_tensor = self._check_sum_gaussians(mrom, g_tensor.sum(axis=0), g_tensor) if normalized: g_tensor /= np.sum(g_tensor, axis=0) return g_tensor
[docs] def fit_postprocessing(self, mrom): """ Compute/fit the weights in the validation set. If fit_function is None, the weights are computed through standard space-dependent aggregation methods (e.g., Gaussian functions). If fit_function is not None, the weights are fitted through the provided regression/interpolation model (e.g., RBF, linear, ANN). :param mrom: the multi reduced order model. :return: None """ rom = list(mrom.roms.values())[0] # concatenate params and space params = rom.validation_full_database.parameters_matrix input_list = [] # Loop over the number of snapshots in the validation database for i in range(rom.validation_full_database.snapshots_matrix.shape[0]): # Get the space coordinates for the i-th snapshot space = rom.validation_full_database.get_snapshot_space(i) # Get the parameters for the i-th snapshot param = rom.validation_full_database.parameters_matrix[i, :] # Create the input array for the i-th snapshot snapshot_input = np.hstack([ space, np.tile(param, (space.shape[0], 1)) ]) # Append the input array to the list input_list.append(snapshot_input) # Concatenate the input arrays for all snapshots input_ = np.vstack(input_list) # Fit the regression/interpolation that will be used to predict the # weights in the test database if self.fit_function is None: optimal_sigma = self._optimize_sigma(mrom) g_tensor = self._compute_validation_weights(mrom, optimal_sigma, normalized=False) self.predict_functions = [] for i, rom in enumerate(mrom.roms.values()): g_ = g_tensor[i, ...].reshape(space.shape[0] * params.shape[0], -1) # do a copy of the function used to predict the weights, # otherwise we use the same for all the ROMs rom_func = copy.deepcopy(self.predict_function) # replace NaN with 0 # TODO: this is a temporary fix, we should handle NaNs in a better way g_[np.isnan(g_)] = 1/len(mrom.roms) rom_func.fit(input_, g_) self.predict_functions.append(rom_func) # directly fit a regression to minimize the discrepancy between the aggregation and the snapshots elif self.fit_function is not None: snaps = rom.validation_full_database.snapshots_matrix self.fit_function.fit(input_, snaps.reshape(space.shape[0] * params.shape[0], 1))
[docs] def predict_postprocessing(self, mrom): """ Compute the weights in the test set and the predicted solution as a convex combination of the predicted solutions of the individual ROMs. :param mrom: the multi reduced order model. :return: None """ # prepare the input for the prediction predict_weights = {} db = list(mrom.multi_predict_database.values())[0] input_list = [] # Loop over the number of snapshots in the prediction database for i in range(mrom.predict_full_database.parameters_matrix.shape[0]): # Get the space coordinates for the i-th snapshot space = mrom.train_full_database.get_snapshot_space(i) # Get the parameters for the i-th snapshot param = mrom.predict_full_database.parameters_matrix[i, :] # Create the input array for the i-th snapshot snapshot_input = np.hstack([ space, np.tile(param, (space.shape[0], 1)) ]) # Append the input array to the list input_list.append(snapshot_input) # Concatenate the input arrays for all snapshots input_ = np.vstack(input_list) # initialize weights mrom.weights_predict = {} # predict weights with regression and normalize them # case without fit_function (where we use the standard space-dependent aggregation methods) if self.fit_function is None and self.predict_function is not None: gaussians_test = [] for i, rom in enumerate(mrom.roms.values()): g_ = self.predict_functions[i].predict(input_) gaussians_test.append(g_) gaussians_test = np.array(gaussians_test) gaussians_test = gaussians_test.reshape(len(mrom.roms), db.parameters_matrix.shape[0], space.shape[0]) # normalize weights predict_weights = gaussians_test/np.sum(gaussians_test, axis=0) # compute predicted solution as convex combination of the reduced solutions for i, rom in enumerate(mrom.roms): mrom.weights_predict[rom] = predict_weights[i, ...] db = mrom.multi_predict_database[rom] # case with fit_function (ANN) elif self.fit_function is not None: weights = self.fit_function.predict(input_) # compute predicted solution as convex combination of the reduced solutions for i, rom in enumerate(mrom.roms): mrom.weights_predict[rom] = weights[..., i].reshape( db.parameters_matrix.shape[0], -1) # compute the prediction prediction = np.sum([mrom.weights_predict[k] * mrom.multi_predict_database[k].snapshots_matrix for k in list(mrom.roms.keys())], axis=0) mrom.predict_full_database = Database( mrom.predict_full_database.parameters_matrix, prediction )