"""
Derived module from dmdbase.py for hankel dmd.
Reference:
- H. Arbabi, I. Mezic, Ergodic theory, dynamic mode decomposition, and
computation of spectral properties of the Koopman operator. SIAM Journal on
Applied Dynamical Systems, 2017, 16.4: 2096-2126.
"""
from copy import copy
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view as swv
from .dmdbase import DMDBase
from .dmd import DMD
[docs]class HankelDMD(DMDBase):
"""
Hankel Dynamic Mode Decomposition
:param svd_rank: the rank for the truncation; If 0, the method computes the
optimal rank and uses it for truncation; if positive interger, the
method uses the argument for the truncation; if float between 0 and 1,
the rank is the number of the biggest singular values that are needed
to reach the 'energy' specified by `svd_rank`; if -1, the method does
not compute truncation.
:type svd_rank: int or float
:param int tlsq_rank: rank truncation computing Total Least Square. Default
is 0, that means no truncation.
:param bool exact: flag to compute either exact DMD or projected DMD.
Default is False.
:param opt: argument to control the computation of DMD modes amplitudes.
See :class:`DMDBase`. Default is False.
:type opt: bool or int
:param rescale_mode: Scale Atilde as shown in
10.1016/j.jneumeth.2015.10.010 (section 2.4) before computing its
eigendecomposition. None means no rescaling, 'auto' means automatic
rescaling using singular values, otherwise the scaling factors.
:type rescale_mode: {'auto'} or None or numpy.ndarray
:param bool forward_backward: If True, the low-rank operator is computed
like in fbDMD (reference: https://arxiv.org/abs/1507.02264). Default is
False.
:param int d: the new order for spatial dimension of the input snapshots.
Default is 1.
:param sorted_eigs: Sort eigenvalues (and modes/dynamics accordingly) by
magnitude if `sorted_eigs='abs'`, by real part (and then by imaginary
part to break ties) if `sorted_eigs='real'`. Default: False.
:type sorted_eigs: {'real', 'abs'} or False
:param reconstruction_method: Method used to reconstruct the snapshots of
the dynamical system from the multiple versions available due to how
HankelDMD is conceived. If `'first'` (default) the first version
available is selected (i.e. the nearest to the 0-th row in the
augmented matrix). If `'mean'` we compute the element-wise mean. If
`reconstruction_method` is an array of float values we compute the
weighted average (for each snapshots) using the given values as weights
(the number of weights must be equal to `d`).
:type reconstruction_method: {'first', 'mean'} or array-like
"""
def __init__(
self,
svd_rank=0,
tlsq_rank=0,
exact=False,
opt=False,
rescale_mode=None,
forward_backward=False,
d=1,
sorted_eigs=False,
reconstruction_method="first",
):
super().__init__(
svd_rank=svd_rank,
tlsq_rank=tlsq_rank,
exact=exact,
opt=opt,
rescale_mode=rescale_mode,
sorted_eigs=sorted_eigs,
)
self._d = d
if isinstance(reconstruction_method, list):
if len(reconstruction_method) != d:
raise ValueError(
"The length of the array of weights must be equal to d"
)
elif isinstance(reconstruction_method, np.ndarray):
if (
reconstruction_method.ndim > 1
or reconstruction_method.shape[0] != d
):
raise ValueError(
"The length of the array of weights must be equal to d"
)
self._reconstruction_method = reconstruction_method
self._sub_dmd = DMD(
svd_rank=svd_rank,
tlsq_rank=tlsq_rank,
exact=exact,
opt=opt,
rescale_mode=rescale_mode,
forward_backward=forward_backward,
sorted_eigs=sorted_eigs,
)
@property
def d(self):
"""The new order for spatial dimension of the input snapshots."""
return self._d
[docs] def _hankel_first_occurrence(self, time):
r"""
For a given `t` such that there is :math:`k \in \mathbb{N}` such that
:math:`t = t_0 + k dt`, return the index of the first column in Hankel
pseudo matrix (see also :func:`_pseudo_hankel_matrix`) which contains
the snapshot corresponding to `t`.
:param time: The time corresponding to the requested snapshot.
:return: The index of the first appeareance of `time` in the columns of
Hankel pseudo matrix.
:rtype: int
"""
return max(
0,
(time - self.original_time["t0"]) // self.dmd_time["dt"]
- (self.original_time["t0"] + self.d - 1),
)
[docs] def _update_sub_dmd_time(self):
"""
Update the time dictionaries (`dmd_time` and `original_time`) of
the auxiliary DMD instance `HankelDMD._sub_dmd` after an update of the
time dictionaries of the time dictionaries of this instance of the
higher level instance of `HankelDMD`.
"""
self._sub_dmd.dmd_time["t0"] = self._hankel_first_occurrence(
self.dmd_time["t0"]
)
self._sub_dmd.dmd_time["tend"] = self._hankel_first_occurrence(
self.dmd_time["tend"]
)
[docs] def reconstructions_of_timeindex(self, timeindex=None):
"""
Build a collection of all the available versions of the given
`timeindex`. The indexing of time instants is the same used for
:func:`reconstructed_data`. For each time instant there are at least
one and at most `d` versions. If `timeindex` is `None` the function
returns the whole collection, for all the time instants.
:param int timeindex: The index of the time snapshot.
:return: a collection of all the available versions for the given
time snapshot, or for all the time snapshots if `timeindex` is
`None` (in the second case, time varies along the first dimension
of the array returned).
:rtype: numpy.ndarray or list
"""
self._update_sub_dmd_time()
rec = self._sub_dmd.reconstructed_data
space_dim = rec.shape[0] // self.d
time_instants = rec.shape[1] + self.d - 1
# for each time instance, we collect all its appearences. each
# snapshot appears at most d times (for instance, the first appears
# only once).
reconstructed_snapshots = np.full(
(time_instants, self.d, space_dim), np.nan, dtype=rec.dtype
)
c_idxes = (
np.array(range(self.d))[:, None]
.repeat(2, axis=1)[None, :]
.repeat(rec.shape[1], axis=0)
)
c_idxes[:, :, 0] += np.array(range(rec.shape[1]))[:, None]
reconstructed_snapshots[c_idxes[:, :, 0], c_idxes[:, :, 1]] = np.array(
np.swapaxes(np.split(rec.T, self.d, axis=1), 0, 1)
)
if timeindex is None:
return reconstructed_snapshots
return reconstructed_snapshots[timeindex]
[docs] def _first_reconstructions(self, reconstructions):
"""Return the first occurrence of each snapshot available in the given
matrix (which must be the result of `self._sub_dmd.reconstructed_data`,
or have the same shape).
:param reconstructions: A matrix of (higher-order) snapshots having
shape `(space*self.d, time_instants)`
:type reconstructions: np.ndarray
:return: The first snapshot that occurs in `reconstructions` for each
available time instant.
:rtype: np.ndarray
"""
first_nonmasked_idx = np.repeat(
np.array(range(reconstructions.shape[0]))[:, None], 2, axis=1
)
first_nonmasked_idx[self.d - 1 :, 1] = self.d - 1
return reconstructions[
first_nonmasked_idx[:, 0], first_nonmasked_idx[:, 1]
].T
@property
def reconstructed_data(self):
self._update_sub_dmd_time()
rec = self.reconstructions_of_timeindex()
rec = np.ma.array(rec, mask=np.isnan(rec))
if self._reconstruction_method == "first":
result = self._first_reconstructions(rec)
elif self._reconstruction_method == "mean":
result = np.mean(rec, axis=1).T
elif isinstance(self._reconstruction_method, (np.ndarray, list)):
result = np.average(
rec, axis=1, weights=self._reconstruction_method
).T
else:
raise ValueError(
"The reconstruction method wasn't recognized: {}".format(
self._reconstruction_method
)
)
# we want to return only the requested timesteps
time_index = min(
self.d - 1,
int(
(self.dmd_time["t0"] - self.original_time["t0"])
// self.dmd_time["dt"]
),
)
result = result[:, time_index : time_index + len(self.dmd_timesteps)]
return result.filled(fill_value=0)
[docs] def _pseudo_hankel_matrix(self, X):
"""
Arrange the snapshot in the matrix `X` into the (pseudo) Hankel
matrix. The attribute `d` controls the number of snapshot from `X` in
each snapshot of the Hankel matrix.
:Example:
>>> from pydmd import HankelDMD
>>> import numpy as np
>>> dmd = HankelDMD(d=2)
>>> a = np.array([[1, 2, 3, 4, 5]])
>>> dmd._pseudo_hankel_matrix(a)
array([[1, 2, 3, 4],
[2, 3, 4, 5]])
>>> dmd = HankelDMD(d=4)
>>> dmd._pseudo_hankel_matrix(a)
array([[1, 2],
[2, 3],
[3, 4],
[4, 5]])
>>> dmd = HankelDMD(d=2)
>>> a = np.array([1,2,3,4,5,6]).reshape(2,3)
>>> a
array([[1, 2, 3],
[4, 5, 6]])
>>> dmd._pseudo_hankel_matrix(a)
array([[1, 2],
[4, 5],
[2, 3],
[5, 6]])
"""
return (
swv(X.T, (self.d, X.shape[0]))[:, 0]
.reshape(X.shape[1] - self.d + 1, -1)
.T
)
@property
def modes(self):
return self._sub_dmd.modes
@property
def eigs(self):
return self._sub_dmd.eigs
@property
def amplitudes(self):
return self._sub_dmd.amplitudes
@property
def operator(self):
return self._sub_dmd.operator
@property
def svd_rank(self):
return self._sub_dmd.svd_rank
@property
def modes_activation_bitmask(self):
return self._sub_dmd.modes_activation_bitmask
@modes_activation_bitmask.setter
def modes_activation_bitmask(self, value):
self._sub_dmd.modes_activation_bitmask = value
# due to how we implemented HankelDMD we need an alternative implementation
# of __getitem__
def __getitem__(self, key):
"""
Restrict the DMD modes used by this instance to a subset of indexes
specified by keys. The value returned is a shallow copy of this DMD
instance, with a different value in :func:`modes_activation_bitmask`.
Therefore assignments to attributes are not reflected into the original
instance.
However the DMD instance returned should not be used for low-level
manipulations on DMD modes, since the underlying DMD operator is shared
with the original instance. For this reasons modifications to NumPy
arrays may result in unwanted and unspecified situations which should
be avoided in principle.
:param key: An index (integer), slice or list of indexes.
:type key: int or slice or list or np.ndarray
:return: A shallow copy of this DMD instance having only a subset of
DMD modes which are those indexed by `key`.
:rtype: HankelDMD
"""
sub_dmd_copy = copy(self._sub_dmd)
sub_dmd_copy.allocate_proxy()
shallow_copy = copy(self)
shallow_copy._sub_dmd = sub_dmd_copy
return DMDBase.__getitem__(shallow_copy, key)
[docs] def fit(self, X):
"""
Compute the Dynamic Modes Decomposition to the input data.
:param X: the input snapshots.
:type X: numpy.ndarray or iterable
"""
if isinstance(X, np.ndarray) and X.ndim == 2:
n_samples = X.shape[1]
else:
n_samples = len(X)
if n_samples < self._d:
msg = """The number of snapshots provided is not enough for d={}.
Expected at least d."""
raise ValueError(msg.format(self._d))
snp, self._snapshots_shape = self._col_major_2darray(X)
self._snapshots = self._pseudo_hankel_matrix(snp)
self._sub_dmd.fit(self._snapshots)
# Default timesteps
self._set_initial_time_dictionary(
{"t0": 0, "tend": n_samples - 1, "dt": 1}
)
return self