"""Module for the Reduced Order Modeling."""
import math
import copy
import pickle
import numpy as np
from scipy.spatial.qhull import Delaunay
from sklearn.model_selection import KFold
from .database import Database
[docs]class ReducedOrderModel():
"""
Reduced Order Model class.
This class performs the actual reduced order model using the selected
methods for approximation and reduction.
:param ezyrb.Database database: the database to use for training the
reduced order model.
:param ezyrb.Reduction reduction: the reduction method to use in reduced
order model.
:param ezyrb.Approximation approximation: the approximation method to use
in reduced order model.
:param object scaler_red: the scaler for the reduced variables (eg. modal
coefficients). Default is None.
:cvar ezyrb.Database database: the database used for training the reduced
order model.
:cvar ezyrb.Reduction reduction: the reduction method used in reduced order
model.
:cvar ezyrb.Approximation approximation: the approximation method used in
reduced order model.
:cvar object scaler_red: the scaler for the reduced variables (eg. modal
coefficients).
:Example:
>>> from ezyrb import ReducedOrderModel as ROM
>>> from ezyrb import POD, RBF, Database
>>> pod = POD()
>>> rbf = RBF()
>>> # param, snapshots and new_param are assumed to be declared
>>> db = Database(param, snapshots)
>>> rom = ROM(db, pod, rbf).fit()
>>> rom.predict(new_param)
"""
def __init__(self, database, reduction, approximation,
plugins=None):
self.database = database
self.reduction = reduction
self.approximation = approximation
if plugins is None:
plugins = []
self.plugins = plugins
[docs] def fit(self):
r"""
Calculate reduced space
"""
import copy
self._full_database = copy.deepcopy(self.database)
# FULL-ORDER PREPROCESSING here
for plugin in self.plugins:
plugin.fom_preprocessing(self)
self.reduction.fit(self._full_database.snapshots_matrix.T)
# print(self.reduction.singular_values)
# print(self._full_database.snapshots_matrix)
reduced_snapshots = self.reduction.transform(
self._full_database.snapshots_matrix.T).T
self._reduced_database = Database(
self._full_database.parameters_matrix, reduced_snapshots)
# REDUCED-ORDER PREPROCESSING here
for plugin in self.plugins:
plugin.rom_preprocessing(self)
self.approximation.fit(
self._reduced_database.parameters_matrix,
self._reduced_database.snapshots_matrix)
return self
[docs] def predict(self, mu):
"""
Calculate predicted solution for given mu
:return: the database containing all the predicted solution (with
corresponding parameters).
:rtype: Database
"""
mu = np.atleast_2d(mu)
self._reduced_database = Database(
mu, np.atleast_2d(self.approximation.predict(mu)))
# REDUCED-ORDER POSTPROCESSING here
for plugin in self.plugins:
plugin.rom_postprocessing(self)
self._full_database = Database(
np.atleast_2d(mu),
self.reduction.inverse_transform(
self._reduced_database.snapshots_matrix.T).T
)
# FULL-ORDER POSTPROCESSING here
for plugin in self.plugins:
plugin.fom_postprocessing(self)
return self._full_database
[docs] def save(self, fname, save_db=True, save_reduction=True, save_approx=True):
"""
Save the object to `fname` using the pickle module.
:param str fname: the name of file where the reduced order model will
be saved.
:param bool save_db: Flag to select if the `Database` will be saved.
:param bool save_reduction: Flag to select if the `Reduction` will be
saved.
:param bool save_approx: Flag to select if the `Approximation` will be
saved.
Example:
>>> from ezyrb import ReducedOrderModel as ROM
>>> rom = ROM(...) # Construct here the rom
>>> rom.fit()
>>> rom.save('ezyrb.rom')
"""
rom_to_store = copy.copy(self)
if not save_db:
del rom_to_store.database
if not save_reduction:
del rom_to_store.reduction
if not save_approx:
del rom_to_store.approximation
with open(fname, 'wb') as output:
pickle.dump(rom_to_store, output, pickle.HIGHEST_PROTOCOL)
[docs] @staticmethod
def load(fname):
"""
Load the object from `fname` using the pickle module.
:return: The `ReducedOrderModel` loaded
Example:
>>> from ezyrb import ReducedOrderModel as ROM
>>> rom = ROM.load('ezyrb.rom')
>>> rom.predict(new_param)
"""
with open(fname, 'rb') as output:
rom = pickle.load(output)
return rom
[docs] def test_error(self, test, norm=np.linalg.norm):
"""
Compute the mean norm of the relative error vectors of predicted
test snapshots.
:param database.Database test: the input test database.
:param function norm: the function used to assign at the vector of
errors a float number. It has to take as input a 'numpy.ndarray'
and returns a float. Default value is the L2 norm.
:return: the mean L2 norm of the relative errors of the estimated
test snapshots.
:rtype: numpy.ndarray
"""
predicted_test = self.predict(test.parameters_matrix)
return np.mean(
norm(predicted_test.snapshots_matrix - test.snapshots_matrix,
axis=1) / norm(test.snapshots_matrix, axis=1))
[docs] def kfold_cv_error(self, n_splits, *args, norm=np.linalg.norm, **kwargs):
r"""
Split the database into k consecutive folds (no shuffling by default).
Each fold is used once as a validation while the k - 1 remaining folds
form the training set. If `n_splits` is equal to the number of
snapshots this function is the same as `loo_error` but the error here
is relative and not absolute.
:param int n_splits: number of folds. Must be at least 2.
:param function norm: function to apply to compute the relative error
between the true snapshot and the predicted one.
Default value is the L2 norm.
:param \*args: additional parameters to pass to the `fit` method.
:param \**kwargs: additional parameters to pass to the `fit` method.
:return: the vector containing the errors corresponding to each fold.
:rtype: numpy.ndarray
"""
error = []
kf = KFold(n_splits=n_splits)
for train_index, test_index in kf.split(self.database):
new_db = self.database[train_index]
rom = type(self)(new_db, copy.deepcopy(self.reduction),
copy.deepcopy(self.approximation)).fit(
*args, **kwargs)
error.append(rom.test_error(self.database[test_index], norm))
return np.array(error)
[docs] def loo_error(self, *args, norm=np.linalg.norm, **kwargs):
r"""
Estimate the approximation error using *leave-one-out* strategy. The
main idea is to create several reduced spaces by combining all the
snapshots except one. The error vector is computed as the difference
between the removed snapshot and the projection onto the properly
reduced space. The procedure repeats for each snapshot in the database.
The `norm` is applied on each vector of error to obtained a float
number.
:param function norm: the function used to assign at each vector of
error a float number. It has to take as input a 'numpy.ndarray` and
returns a float. Default value is the L2 norm.
:param \*args: additional parameters to pass to the `fit` method.
:param \**kwargs: additional parameters to pass to the `fit` method.
:return: the vector that contains the errors estimated for all
parametric points.
:rtype: numpy.ndarray
"""
error = np.zeros(len(self.database))
db_range = list(range(len(self.database)))
for j in db_range:
indeces = np.array([True] * len(self.database))
indeces[j] = False
new_db = self.database[indeces]
test_db = self.database[~indeces]
rom = type(self)(new_db, copy.deepcopy(self.reduction),
copy.deepcopy(self.approximation)).fit()
error[j] = rom.test_error(test_db)
return error
[docs] def optimal_mu(self, error=None, k=1):
"""
Return the parametric points where new high-fidelity solutions have to
be computed in order to globally reduce the estimated error. These
points are the barycentric center of the region (simplex) with higher
error.
:param numpy.ndarray error: the estimated error evaluated for each
snapshot; if error array is not passed, it is computed using
:func:`loo_error` with the default function. Default value is None.
:param int k: the number of optimal points to return. Default value is
1.
:return: the optimal points
:rtype: numpy.ndarray
"""
if error is None:
error = self.loo_error()
mu = self.database.parameters_matrix
tria = Delaunay(mu)
error_on_simplex = np.array([
np.sum(error[smpx]) * self._simplex_volume(mu[smpx])
for smpx in tria.simplices
])
barycentric_point = []
for index in np.argpartition(error_on_simplex, -k)[-k:]:
worst_tria_pts = mu[tria.simplices[index]]
worst_tria_err = error[tria.simplices[index]]
barycentric_point.append(
np.average(worst_tria_pts, axis=0, weights=worst_tria_err))
return np.asarray(barycentric_point)
[docs] def _simplex_volume(self, vertices):
"""
Method implementing the computation of the volume of a N dimensional
simplex.
Source from: `wikipedia.org/wiki/Simplex
<https://en.wikipedia.org/wiki/Simplex>`_.
:param numpy.ndarray simplex_vertices: Nx3 array containing the
parameter values representing the vertices of a simplex. N is the
dimensionality of the parameters.
:return: N dimensional volume of the simplex.
:rtype: float
"""
distance = np.transpose([vertices[0] - vi for vi in vertices[1:]])
return np.abs(
np.linalg.det(distance) / math.factorial(vertices.shape[1]))