""" Module for AbstractProblem class """
from abc import ABCMeta, abstractmethod
from ..utils import merge_tensors, check_consistency
from copy import deepcopy
import torch
[docs]
class AbstractProblem(metaclass=ABCMeta):
"""
The abstract `AbstractProblem` class. All the class defining a PINA Problem
should be inheritied from this class.
In the definition of a PINA problem, the fundamental elements are:
the output variables, the condition(s), and the domain(s) where the
conditions are applied.
"""
def __init__(self):
# variable storing all points
self.input_pts = {}
# varible to check if sampling is done. If no location
# element is presented in Condition this variable is set to true
self._have_sampled_points = {}
for condition_name in self.conditions:
self._have_sampled_points[condition_name] = False
# put in self.input_pts all the points that we don't need to sample
self._span_condition_points()
def __deepcopy__(self, memo):
"""
Implements deepcopy for the
:class:`~pina.problem.abstract_problem.AbstractProblem` class.
:param dict memo: Memory dictionary, to avoid excess copy
:return: The deep copy of the
:class:`~pina.problem.abstract_problem.AbstractProblem` class
:rtype: AbstractProblem
"""
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
setattr(result, k, deepcopy(v, memo))
return result
@property
def input_variables(self):
"""
The input variables of the AbstractProblem, whose type depends on the
type of domain (spatial, temporal, and parameter).
:return: the input variables of self
:rtype: list
"""
variables = []
if hasattr(self, "spatial_variables"):
variables += self.spatial_variables
if hasattr(self, "temporal_variable"):
variables += self.temporal_variable
if hasattr(self, "parameters"):
variables += self.parameters
if hasattr(self, "custom_variables"):
variables += self.custom_variables
return variables
@property
def domain(self):
"""
The domain(s) where the conditions of the AbstractProblem are valid.
If more than one domain type is passed, a list of Location is
retured.
:return: the domain(s) of ``self``
:rtype: list[Location]
"""
domains = [
getattr(self, f"{t}_domain")
for t in ["spatial", "temporal", "parameter"]
if hasattr(self, f"{t}_domain")
]
if len(domains) == 1:
return domains[0]
elif len(domains) == 0:
raise RuntimeError
if len(set(map(type, domains))) == 1:
domain = domains[0].__class__({})
[domain.update(d) for d in domains]
return domain
else:
raise RuntimeError("different domains")
@input_variables.setter
def input_variables(self, variables):
raise RuntimeError
@property
@abstractmethod
def output_variables(self):
"""
The output variables of the problem.
"""
pass
@property
@abstractmethod
def conditions(self):
"""
The conditions of the problem.
"""
pass
def _span_condition_points(self):
"""
Simple function to get the condition points
"""
for condition_name in self.conditions:
condition = self.conditions[condition_name]
if hasattr(condition, "input_points"):
samples = condition.input_points
self.input_pts[condition_name] = samples
self._have_sampled_points[condition_name] = True
if hasattr(self, "unknown_parameter_domain"):
# initialize the unknown parameters of the inverse problem given
# the domain the user gives
self.unknown_parameters = {}
for i, var in enumerate(self.unknown_variables):
range_var = self.unknown_parameter_domain.range_[var]
tensor_var = (
torch.rand(1, requires_grad=True) * range_var[1]
+ range_var[0]
)
self.unknown_parameters[var] = torch.nn.Parameter(
tensor_var
)
[docs]
def discretise_domain(
self, n, mode="random", variables="all", locations="all"
):
"""
Generate a set of points to span the `Location` of all the conditions of
the problem.
:param n: Number of points to sample, see Note below
for reference.
:type n: int
:param mode: Mode for sampling, defaults to ``random``.
Available modes include: random sampling, ``random``;
latin hypercube sampling, ``latin`` or ``lh``;
chebyshev sampling, ``chebyshev``; grid sampling ``grid``.
:param variables: problem's variables to be sampled, defaults to 'all'.
:type variables: str | list[str]
:param locations: problem's locations from where to sample, defaults to 'all'.
:type locations: str
:Example:
>>> pinn.discretise_domain(n=10, mode='grid')
>>> pinn.discretise_domain(n=10, mode='grid', location=['bound1'])
>>> pinn.discretise_domain(n=10, mode='grid', variables=['x'])
.. warning::
``random`` is currently the only implemented ``mode`` for all geometries, i.e.
``EllipsoidDomain``, ``CartesianDomain``, ``SimplexDomain`` and the geometries
compositions ``Union``, ``Difference``, ``Exclusion``, ``Intersection``. The
modes ``latin`` or ``lh``, ``chebyshev``, ``grid`` are only implemented for
``CartesianDomain``.
"""
# check consistecy n
check_consistency(n, int)
# check consistency mode
check_consistency(mode, str)
if mode not in ["random", "grid", "lh", "chebyshev", "latin"]:
raise TypeError(f"mode {mode} not valid.")
# check consistency variables
if variables == "all":
variables = self.input_variables
else:
check_consistency(variables, str)
if sorted(variables) != sorted(self.input_variables):
TypeError(
f"Wrong variables for sampling. Variables ",
f"should be in {self.input_variables}.",
)
# check consistency location
locations_to_sample = [
condition
for condition in self.conditions
if hasattr(self.conditions[condition], "location")
]
if locations == "all":
# only locations that can be sampled
locations = locations_to_sample
else:
check_consistency(locations, str)
if sorted(locations) != sorted(locations_to_sample):
TypeError(
f"Wrong locations for sampling. Location ",
f"should be in {locations_to_sample}.",
)
# sampling
for location in locations:
condition = self.conditions[location]
# we try to check if we have already sampled
try:
already_sampled = [self.input_pts[location]]
# if we have not sampled, a key error is thrown
except KeyError:
already_sampled = []
# if we have already sampled fully the condition
# but we want to sample again we set already_sampled
# to an empty list since we need to sample again, and
# self._have_sampled_points to False.
if self._have_sampled_points[location]:
already_sampled = []
self._have_sampled_points[location] = False
# build samples
samples = [
condition.location.sample(n=n, mode=mode, variables=variables)
] + already_sampled
pts = merge_tensors(samples)
self.input_pts[location] = pts
# the condition is sampled if input_pts contains all labels
if sorted(self.input_pts[location].labels) == sorted(
self.input_variables
):
self._have_sampled_points[location] = True
self.input_pts[location] = self.input_pts[location].extract(
sorted(self.input_variables)
)
[docs]
def add_points(self, new_points):
"""
Adding points to the already sampled points.
:param dict new_points: a dictionary with key the location to add the points
and values the torch.Tensor points.
"""
if sorted(new_points.keys()) != sorted(self.conditions):
TypeError(
f"Wrong locations for new points. Location ",
f"should be in {self.conditions}.",
)
for location in new_points.keys():
# extract old and new points
old_pts = self.input_pts[location]
new_pts = new_points[location]
# if they don't have the same variables error
if sorted(old_pts.labels) != sorted(new_pts.labels):
TypeError(
f"Not matching variables for old and new points "
f"in condition {location}."
)
if old_pts.labels != new_pts.labels:
new_pts = torch.hstack(
[new_pts.extract([i]) for i in old_pts.labels]
)
new_pts.labels = old_pts.labels
# merging
merged_pts = torch.vstack([old_pts, new_pts])
merged_pts.labels = old_pts.labels
self.input_pts[location] = merged_pts
@property
def have_sampled_points(self):
"""
Check if all points for
``Location`` are sampled.
"""
return all(self._have_sampled_points.values())
@property
def not_sampled_points(self):
"""
Check which points are
not sampled.
"""
# variables which are not sampled
not_sampled = None
if self.have_sampled_points is False:
# check which one are not sampled:
not_sampled = []
for condition_name, is_sample in self._have_sampled_points.items():
if not is_sample:
not_sampled.append(condition_name)
return not_sampled