PointFlow/models/odefunc.py
2019-07-13 21:32:26 -07:00

138 lines
4.5 KiB
Python

import copy
import torch
import torch.nn as nn
from . import diffeq_layers
__all__ = ["ODEnet", "ODEfunc"]
def divergence_approx(f, y, e=None):
e_dzdx = torch.autograd.grad(f, y, e, create_graph=True)[0]
e_dzdx_e = e_dzdx.mul(e)
cnt = 0
while not e_dzdx_e.requires_grad and cnt < 10:
# print("RequiresGrad:f=%s, y(rgrad)=%s, e_dzdx:%s, e:%s, e_dzdx_e:%s cnt=%d"
# % (f.requires_grad, y.requires_grad, e_dzdx.requires_grad,
# e.requires_grad, e_dzdx_e.requires_grad, cnt))
e_dzdx = torch.autograd.grad(f, y, e, create_graph=True)[0]
e_dzdx_e = e_dzdx * e
cnt += 1
approx_tr_dzdx = e_dzdx_e.sum(dim=-1)
assert approx_tr_dzdx.requires_grad, \
"(failed to add node to graph) f=%s %s, y(rgrad)=%s, e_dzdx:%s, e:%s, e_dzdx_e:%s cnt:%s" \
% (
f.size(), f.requires_grad, y.requires_grad, e_dzdx.requires_grad, e.requires_grad, e_dzdx_e.requires_grad, cnt)
return approx_tr_dzdx
class Swish(nn.Module):
def __init__(self):
super(Swish, self).__init__()
self.beta = nn.Parameter(torch.tensor(1.0))
def forward(self, x):
return x * torch.sigmoid(self.beta * x)
class Lambda(nn.Module):
def __init__(self, f):
super(Lambda, self).__init__()
self.f = f
def forward(self, x):
return self.f(x)
NONLINEARITIES = {
"tanh": nn.Tanh(),
"relu": nn.ReLU(),
"softplus": nn.Softplus(),
"elu": nn.ELU(),
"swish": Swish(),
"square": Lambda(lambda x: x ** 2),
"identity": Lambda(lambda x: x),
}
class ODEnet(nn.Module):
"""
Helper class to make neural nets for use in continuous normalizing flows
"""
def __init__(self, hidden_dims, input_shape, context_dim, layer_type="concat", nonlinearity="softplus"):
super(ODEnet, self).__init__()
base_layer = {
"ignore": diffeq_layers.IgnoreLinear,
"squash": diffeq_layers.SquashLinear,
"scale": diffeq_layers.ScaleLinear,
"concat": diffeq_layers.ConcatLinear,
"concat_v2": diffeq_layers.ConcatLinear_v2,
"concatsquash": diffeq_layers.ConcatSquashLinear,
"concatscale": diffeq_layers.ConcatScaleLinear,
}[layer_type]
# build models and add them
layers = []
activation_fns = []
hidden_shape = input_shape
for dim_out in (hidden_dims + (input_shape[0],)):
layer_kwargs = {}
layer = base_layer(hidden_shape[0], dim_out, context_dim, **layer_kwargs)
layers.append(layer)
activation_fns.append(NONLINEARITIES[nonlinearity])
hidden_shape = list(copy.copy(hidden_shape))
hidden_shape[0] = dim_out
self.layers = nn.ModuleList(layers)
self.activation_fns = nn.ModuleList(activation_fns[:-1])
def forward(self, context, y):
dx = y
for l, layer in enumerate(self.layers):
dx = layer(context, dx)
# if not last layer, use nonlinearity
if l < len(self.layers) - 1:
dx = self.activation_fns[l](dx)
return dx
class ODEfunc(nn.Module):
def __init__(self, diffeq):
super(ODEfunc, self).__init__()
self.diffeq = diffeq
self.divergence_fn = divergence_approx
self.register_buffer("_num_evals", torch.tensor(0.))
def before_odeint(self, e=None):
self._e = e
self._num_evals.fill_(0)
def forward(self, t, states):
y = states[0]
t = torch.ones(y.size(0), 1).to(y) * t.clone().detach().requires_grad_(True).type_as(y)
self._num_evals += 1
for state in states:
state.requires_grad_(True)
# Sample and fix the noise.
if self._e is None:
self._e = torch.randn_like(y, requires_grad=True).to(y)
with torch.set_grad_enabled(True):
if len(states) == 3: # conditional CNF
c = states[2]
tc = torch.cat([t, c.view(y.size(0), -1)], dim=1)
dy = self.diffeq(tc, y)
divergence = self.divergence_fn(dy, y, e=self._e).unsqueeze(-1)
return dy, -divergence, torch.zeros_like(c).requires_grad_(True)
elif len(states) == 2: # unconditional CNF
dy = self.diffeq(t, y)
divergence = self.divergence_fn(dy, y, e=self._e).view(-1, 1)
return dy, -divergence
else:
assert 0, "`len(states)` should be 2 or 3"