Source code for athena.nll

"""
Module for Nonlinear Level-set Learning (NLL).

:References:

    - Guannan Zhang, Jiaxin Zhang, Jacob Hinkle.
      Learning nonlinear level sets for dimensionality reduction in function
      approximation. NeurIPS 2019, 13199-13208.
      arxiv: https://arxiv.org/abs/1902.10652

"""
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

# Define the overall data type
torch.set_default_tensor_type(torch.DoubleTensor)


[docs]class NonlinearLevelSet(): """Nonlinear Level Set class. It is implemented as a Reversible neural networks (RevNet). :param int n_layers: number of layers of the RevNet. :param int active_dim: number of active dimensions. :param float lr: learning rate. :param int epochs: number of ephocs. :param float dh: so-called time step of the RevNet. Default is 0.25. :param `torch.optim.Optimizer` optimizer: optimizer used in the training of the RevNet. Its argument are passed in the dict optim_args when :py:meth:`train` is called. :param `torch.optim.lr_scheduler._LRScheduler` scheduler: scheduler used in the training of the RevNet. Its argument are passed in the dict scheduler_args when :py:meth:`train` is called. Default is None. :cvar `BackwardNet` backward: backward net of the RevNet. See :class:`BackwardNet` class in :py:mod:`nll` module. :cvar `ForwardNet` forward: forward net of the RevNet. See :class:`ForwardNet` class in :py:mod:`nll` module. :cvar list loss_vec: list containg the loss at every epoch. """ def __init__(self, n_layers, active_dim, lr, epochs, dh=0.25, optimizer=optim.Adam, scheduler=None): self.n_layers = n_layers self.active_dim = active_dim self.lr = lr self.epochs = epochs self.dh = dh if issubclass(optimizer, optim.Optimizer): self.optimizer = optimizer if scheduler and issubclass(scheduler, torch.optim.lr_scheduler._LRScheduler): self.scheduler = scheduler else: self.scheduler = None self.backward = None self.forward = None self.loss_vec = []
[docs] def train(self, inputs, gradients, outputs=None, interactive=False, target_loss=0.0001, optim_args=None, scheduler_args=None): """ Train the whole RevNet. :param torch.Tensor inputs: DoubleTensor n_samples-by-n_params containing the points in the full input space. :param torch.Tensor gradients: DoubleTensor n_samples-by-n_params containing the gradient samples wrt the input parameters. :param numpy.ndarray outputs: array n_samples-by-1 containing the corresponding function evaluations. Needed only for the interactive mode. Default is None. :param bool interactive: if True a plot with the loss function decay, and the sufficient summary plot will be showed and updated every 10 epochs, and at the last epoch. Default is False. :param float target_loss: loss threshold. Default is 0.0001. :param dict optim_args: dictionary passed to the optimizer. :param dict scheduler_args: dictionary passed to the scheduler. :raises: ValueError: in interactive mode outputs must be provided for the sufficient summary plot. """ if optim_args is None: optim_args = {} if scheduler_args is None: scheduler_args = {} if inputs.shape[1] % 2 != 0: raise ValueError('The parameter space\'s dimension must be even.') if interactive: if outputs is None: raise ValueError( 'outputs in interactive mode have to be provided!') fig = plt.figure(figsize=(12, 5)) ax1 = fig.add_subplot(1, 2, 1) ax2 = fig.add_subplot(1, 2, 2) ax2.set_title('Loss function decay') ax2.grid(linestyle='dotted') plt.ion() plt.show() # Build the forward network self.forward = ForwardNet(n_params=inputs.shape[1], n_layers=self.n_layers, dh=self.dh, active_dim=self.active_dim) # Initialize the gradient self.forward.zero_grad() optimizer = self.optimizer(self.forward.parameters(), self.lr, **optim_args) # Initialize scheduler if present if self.scheduler: sched = self.scheduler(optimizer, **scheduler_args) # Training for i in range(self.epochs): optimizer.zero_grad() mapped_inputs = self.forward(inputs) loss = self.forward.customized_loss(inputs, mapped_inputs, gradients) if i % 10 == 0 or i == self.epochs - 1: print(f'epoch = {i}, loss = {loss}') if interactive: ax1.cla() ax1.set_title('Sufficient summary plot') ax1.plot(mapped_inputs.detach().numpy()[:, 0], outputs, 'bo') ax1.grid(linestyle='dotted') ax2.plot(i, loss.detach().numpy(), 'ro') plt.draw() plt.pause(0.00001) self.loss_vec.append(loss.detach().numpy()) # Build the inverse network based on the trained forward network self.backward = BackwardNet(n_params=inputs.shape[1], n_layers=self.n_layers, dh=self.dh) self.backward.zero_grad() for j in range(self.backward.n_layers): name_y = f'fc{j + 1}_y' name_z = f'fc{j + 1}_z' getattr(self.backward, name_y).weight = torch.nn.Parameter( getattr(self.forward, name_y).weight) getattr(self.backward, name_z).weight = torch.nn.Parameter( getattr(self.forward, name_z).weight) getattr(self.backward, name_y).bias = torch.nn.Parameter( getattr(self.forward, name_y).bias) getattr(self.backward, name_z).bias = torch.nn.Parameter( getattr(self.forward, name_z).bias) if torch.mean( torch.abs( torch.add(-1 * inputs, self.backward( self.forward(inputs))))) > 1e-5: print('self.backward is wrong!') print( torch.mean( torch.abs( torch.add(-1 * inputs, self.backward( self.forward(inputs)))))) if loss < target_loss: break loss.backward() optimizer.step() if self.scheduler: sched.step() if interactive: plt.ioff() plt.show()
[docs] def plot_sufficient_summary(self, inputs, outputs, filename=None, figsize=(10, 8), title=''): """ Plot the sufficient summary. :param torch.Tensor inputs: DoubleTensor n_samples-by-n_params containing the points in the full input space. :param numpy.ndarray outputs: array n_samples-by-1 containing the corresponding function evaluations. :param str filename: if specified, the plot is saved at `filename`. :param tuple(int,int) figsize: tuple in inches defining the figure size. Defaults to (10, 8). :param str title: title of the plot. :raises: ValueError .. warning:: Plot only available for active dimensions up to 1. """ plt.figure(figsize=figsize) plt.title(title) if self.active_dim == 1: reduced_inputs = self.forward(inputs)[:, 0] plt.plot(reduced_inputs.detach().numpy(), outputs, 'bo') plt.xlabel('Reduced input') plt.ylabel('Output') else: raise ValueError( 'Sufficient summary plots cannot be made in more than 1 ' \ 'dimension.' ) plt.grid(linestyle='dotted') if filename: plt.savefig(filename) else: plt.show()
[docs] def plot_loss(self, filename=None, figsize=(10, 8), title=''): """ Plot the loss function decay. :param str filename: if specified, the plot is saved at `filename`. :param tuple(int,int) figsize: tuple in inches defining the figure size. Defaults to (10, 8). :param str title: title of the plot. """ plt.figure(figsize=figsize) plt.title(title) x_range = list(range(1, self.epochs + 1, 10)) + [self.epochs] plt.plot(x_range, self.loss_vec, 'b-') plt.xlabel('Epoch') plt.ylabel('Loss') plt.grid(linestyle='dotted') if filename: plt.savefig(filename) else: plt.show()
[docs] def save_forward(self, outfile): """ Save the forward map for future inference. :param str outfile: filename of the net to save. Use either .pt or .pth. See notes below. .. note:: A common PyTorch convention is to save models using either a .pt or .pth file extension. """ torch.save(self.forward.state_dict(), outfile)
[docs] def load_forward(self, infile, n_params): """ Load the forward map for inference. :param str infile: filename of the saved net to load. See notes below. :param int n_params: number of input parameters. .. note:: A common PyTorch convention is to save models using either a .pt or .pth file extension. """ self.forward = ForwardNet(n_params, self.n_layers, self.dh, self.active_dim) self.forward.load_state_dict(torch.load(infile)) self.forward.eval()
[docs] def save_backward(self, outfile): """ Save the backward map for future inference. :param str outfile: filename of the net to save. Use either .pt or .pth. See notes below. .. note:: A common PyTorch convention is to save models using either a .pt or .pth file extension. """ torch.save(self.backward.state_dict(), outfile)
[docs] def load_backward(self, infile, n_params): """ Load the backward map for inference. :param str infile: filename of the saved net to load. See notes below. :param int n_params: number of input parameters. .. note:: A common PyTorch convention is to save models using either a .pt or .pth file extension. """ self.backward = BackwardNet(n_params, self.n_layers, self.dh) self.backward.load_state_dict(torch.load(infile)) self.backward.eval()
[docs]class ForwardNet(nn.Module): """ Forward net class. It is part of the RevNet. :param int n_params: number of input parameters. :param int n_layers: number of layers of the RevNet. :param float dh: so-called time step of the RevNet. :param int active_dim: number of active dimensions. :cvar slice omega: a slice object indicating the active dimension to keep. For example to keep the first two dimension `omega = slice(2)`. It is automatically set with `active_dim`. """ def __init__(self, n_params, n_layers, dh, active_dim): super().__init__() self.n_params = n_params // 2 self.n_layers = n_layers self.dh = dh self.omega = slice(active_dim) for i in range(self.n_layers): setattr(self, f'fc{i + 1}_y', nn.Linear(self.n_params, 2 * self.n_params)) setattr(self, f'fc{i + 1}_z', nn.Linear(self.n_params, 2 * self.n_params))
[docs] def forward(self, inputs): """ Maps original inputs to transformed inputs. :param torch.Tensor inputs: DoubleTensor n_samples-by-n_params containing the points in the original full input space. :return mapped_inputs: DoubleTensor n_samples-by-n_params with the nonlinear transformed inputs. :rtype: torch.Tensor """ bb = torch.split(inputs, self.n_params, dim=1) vars()['var0_y'] = torch.clone(bb[0]) vars()['var0_z'] = torch.clone(bb[1]) for i in range(self.n_layers): name_y = f'fc{i + 1}_y' name_z = f'fc{i + 1}_z' var_y0 = f'var{i}_y' var_z0 = f'var{i}_z' var_y1 = f'var{i + 1}_y' var_z1 = f'var{i + 1}_z' sig_y = torch.unsqueeze( torch.tanh(getattr(self, name_y)(vars()[var_z0])), 2) K_y = torch.transpose(getattr(self, name_y).weight, 0, 1) vars()[var_y1] = torch.add( vars()[var_y0], 1 * self.dh * torch.squeeze(torch.matmul(K_y, sig_y), 2)) sig_z = torch.unsqueeze( torch.tanh(getattr(self, name_z)(vars()[var_y1])), 2) K_z = torch.transpose(getattr(self, name_z).weight, 0, 1) vars()[var_z1] = torch.add( vars()[var_z0], -1 * self.dh * torch.squeeze(torch.matmul(K_z, sig_z), 2)) return torch.cat((vars()[var_y1], vars()[var_z1]), 1)
[docs] def customized_loss(self, inputs, mapped_inputs, gradients): """ Custom loss function. :param torch.Tensor inputs: DoubleTensor n_samples-by-n_params containing the points in the full input space. :param torch.Tensor mapped_inputs: DoubleTensor n_samples-by-n_params containing the mapped points in the full input space. They are the result of the forward application. :param torch.Tensor gradients: DoubleTensor n_samples-by-n_params containing the gradient samples wrt the input parameters. """ # Define the weights and bias of the inverse network for i in range(self.n_layers): name_y = f'fc{i + 1}_y' name_z = f'fc{i + 1}_z' inv_name_y_weight = f'inv_fc{i + 1}_y_weight' inv_name_z_weight = f'inv_fc{i + 1}_z_weight' vars()[inv_name_y_weight] = getattr(self, name_y).weight vars()[inv_name_z_weight] = getattr(self, name_z).weight inv_name_y_bias = f'inv_fc{i + 1}_y_bias' inv_name_z_bias = f'inv_fc{i + 1}_z_bias' vars()[inv_name_y_bias] = getattr(self, name_y).bias vars()[inv_name_z_bias] = getattr(self, name_z).bias Jacob = torch.empty(inputs.size()[0], 2 * self.n_params, 2 * self.n_params) for j in range(2 * self.n_params): output_dy = torch.clone(mapped_inputs) output_dy[:, j] += 0.001 bb = torch.split(output_dy, self.n_params, dim=1) var_y0 = f'var{self.n_layers - 1}_y' var_z0 = f'var{self.n_layers - 1}_z' vars()[var_y0] = bb[0] vars()[var_z0] = bb[1] for i in range(self.n_layers - 1, -1, -1): inv_name_y_weight = f'inv_fc{i + 1}_y_weight' inv_name_z_weight = f'inv_fc{i + 1}_z_weight' inv_name_y_bias = f'inv_fc{i + 1}_y_bias' inv_name_z_bias = f'inv_fc{i + 1}_z_bias' var_y0 = f'var{i}_y' var_z0 = f'var{i}_z' var_y1 = f'var{i - 1}_y' var_z1 = f'var{i - 1}_z' sig_z = torch.tanh( torch.add( torch.matmul(vars()[inv_name_z_weight], torch.unsqueeze(vars()[var_y0], 2)), torch.unsqueeze(vars()[inv_name_z_bias], 1))) K_z = torch.transpose(vars()[inv_name_z_weight], 0, 1) vars()[var_z1] = torch.add( vars()[var_z0], 1 * self.dh * torch.squeeze(torch.matmul(K_z, sig_z), 2)) sig_y = torch.tanh( torch.add( torch.matmul(vars()[inv_name_y_weight], torch.unsqueeze(vars()[var_z1], 2)), torch.unsqueeze(vars()[inv_name_y_bias], 1))) K_y = torch.transpose(vars()[inv_name_y_weight], 0, 1) vars()[var_y1] = torch.add( vars()[var_y0], -1 * self.dh * torch.squeeze(torch.matmul(K_y, sig_y), 2)) dx = torch.cat((vars()[var_y1], vars()[var_z1]), 1) # # Test the invertibility # # Warning: it is computational intense # if torch.mean(torch.abs(torch.add(-1 * output_dy, # self.forward(dx)))) > 1e-5: # print('Something is wrong in Jacobian computation') # print(torch.mean( # torch.abs(torch.add(-1 * output_dy, self.forward(dx))))) for k in range(2 * self.n_params): Jacob[:, j, k] = torch.add(dx[:, k], -1 * inputs[:, k]) JJ2 = torch.unsqueeze(torch.sqrt(torch.sum(torch.mul(Jacob, Jacob), 2)), 2) JJJ = torch.div(Jacob, JJ2.expand(-1, -1, 2 * self.n_params)) ex_data = torch.unsqueeze(gradients, 2) loss_weights = torch.clone(torch.squeeze(torch.matmul(JJJ, ex_data), 2)) # anisotropy weigths loss_weights[:, self.omega] = 0.0 loss_anisotropy = torch.sqrt( torch.mean(torch.sum(torch.mul(loss_weights, loss_weights), 1))) J_det = torch.empty(inputs.shape[0]) for k in range(inputs.shape[0]): eee = torch.svd(JJJ[k, :, :])[1] J_det[k] = torch.prod(eee) loss_det = torch.abs(torch.prod(J_det - 1.0)) return loss_anisotropy + loss_det
[docs]class BackwardNet(nn.Module): """Backward Net class. It is part of the RevNet. :param int n_params: number of input parameters. :param int n_layers: number of layers of the RevNet. :param float dh: so-called time step of the RevNet. """ def __init__(self, n_params, n_layers, dh): super().__init__() self.n_params = n_params // 2 self.n_layers = n_layers self.dh = dh for i in range(self.n_layers): setattr(self, f'fc{i + 1}_y', nn.Linear(self.n_params, 2 * self.n_params)) setattr(self, f'fc{i + 1}_z', nn.Linear(self.n_params, 2 * self.n_params))
[docs] def forward(self, mapped_inputs): """ Maps transformed inputs to original inputs. :param torch.Tensor mapped_inputs: DoubleTensor n_samples-by-n_params containing the nonlinear transformed inputs. :return inputs: DoubleTensor n_samples-by-n_params with the points in the original full input space. :rtype: torch.Tensor """ y, z = torch.split(mapped_inputs, self.n_params, dim=1) for i in range(self.n_layers - 1, -1, -1): name_y = f'fc{i + 1}_y' name_z = f'fc{i + 1}_z' sig_z = torch.unsqueeze(torch.tanh(getattr(self, name_z)(y)), 2) K_z = torch.transpose(getattr(self, name_z).weight, 0, 1) z = torch.add( z, 1 * self.dh * torch.squeeze(torch.matmul(K_z, sig_z), 2)) sig_y = torch.unsqueeze(torch.tanh(getattr(self, name_y)(z)), 2) K_y = torch.transpose(getattr(self, name_y).weight, 0, 1) y = torch.add( y, -1 * self.dh * torch.squeeze(torch.matmul(K_y, sig_y), 2)) return torch.cat((y, z), 1)