LION/models/score_sde/resnet.py
2023-01-23 00:14:49 -05:00

231 lines
9.2 KiB
Python

# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# NVIDIA CORPORATION & AFFILIATES and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION & AFFILIATES is strictly prohibited.
""" implement the gloabl prior for LION
"""
import torch.nn as nn
from loguru import logger
import functools
import torch
from ..utils import init_temb_fun, mask_inactive_variables
class SE(nn.Module):
def __init__(self, channel, reduction=8):
super().__init__()
self.fc = nn.Sequential(
nn.Conv2d(channel, channel // reduction, 1, 1, bias=False),
nn.ReLU(inplace=True),
nn.Conv2d(channel // reduction, channel, 1, 1, bias=False),
nn.Sigmoid()
)
def forward(self, inputs):
return inputs * self.fc(inputs)
class ResBlockSEClip(nn.Module):
"""
fixed the conv0 not used error in ResBlockSE
"""
def __init__(self, input_dim, output_dim):
super().__init__()
self.non_linearity = nn.ReLU(inplace=True)
self.input_dim = input_dim
self.output_dim = output_dim
self.conv1 = nn.Conv2d(input_dim*2, output_dim, 1, 1)
self.conv2 = nn.Conv2d(output_dim, output_dim, 1, 1)
in_ch = self.output_dim
self.SE = SE(in_ch)
def forward(self, x, t):
## logger.info('x: {}, t: {}, input_dim={}', x.shape, t.shape, self.input_dim)
clip_feat = t[:, self.input_dim:].contiguous()
t = t[:,:self.input_dim].contiguous()
output = x + t
output = torch.cat([output, clip_feat], dim=1).contiguous()
output = self.conv1(output)
output = self.non_linearity(output)
output = self.conv2(output)
output = self.non_linearity(output)
output = self.SE(output)
shortcut = x
return shortcut + output
def __repr__(self):
return "ResBlockSEClip(%d, %d)"%(self.input_dim, self.output_dim)
class ResBlockSEDrop(nn.Module):
"""
fixed the conv0 not used error in ResBlockSE
"""
def __init__(self, input_dim, output_dim, dropout):
super().__init__()
self.non_linearity = nn.ReLU(inplace=True)
self.input_dim = input_dim
self.output_dim = output_dim
self.conv1 = nn.Conv2d(input_dim, output_dim, 1, 1)
self.conv2 = nn.Conv2d(output_dim, output_dim, 1, 1)
in_ch = self.output_dim
self.SE = SE(in_ch)
self.dropout = nn.Dropout(dropout)
self.dropout_ratio = dropout
def forward(self, x, t):
output = x + t
output = self.conv1(output)
output = self.non_linearity(output)
output = self.dropout(output)
output = self.conv2(output)
output = self.non_linearity(output)
output = self.SE(output)
shortcut = x
return shortcut + output
def __repr__(self):
return "ResBlockSE_withdropout(%d, %d, drop=%f)" % (
self.input_dim, self.output_dim, self.dropout_ratio)
class ResBlock(nn.Module):
def __init__(self, input_dim, output_dim):
# resample=None, act=nn.ELU(),
# normalization=nn.InstanceNorm2d, adjust_padding=False, dilation=1):
super().__init__()
self.non_linearity = nn.ELU()
self.input_dim = input_dim
self.output_dim = output_dim
self.conv1 = nn.Conv2d(input_dim, output_dim, 1, 1)
self.conv2 = nn.Conv2d(output_dim, output_dim, 1, 1)
in_ch = self.output_dim
self.normalize1 = nn.GroupNorm(num_groups=min(in_ch // 4, 32),
num_channels=in_ch, eps=1e-6)
self.normalize2 = nn.GroupNorm(num_groups=min(in_ch // 4, 32),
num_channels=in_ch, eps=1e-6)
def forward(self, x, t):
x = x + t
output = self.conv1(x)
output = self.normalize1(output)
output = self.non_linearity(output)
output = self.conv2(output)
output = self.normalize2(output)
output = self.non_linearity(output)
shortcut = x
return shortcut + output
def __repr__(self):
return "ResBlock(%d, %d)" % (self.input_dim, self.output_dim)
class Prior(nn.Module):
building_block = ResBlock
def __init__(self, args, num_input_channels, *oargs, **kwargs):
super().__init__()
# args: cfg.sde
# oargs: other argument: the global argument
self.condition_input = kwargs.get('condition_input', False)
self.cfg = oargs[0]
self.clip_forge_enable = self.cfg.clipforge.enable # kwargs.get('clipforge.enable', 0)
logger.info('[Build Resnet Prior] Has condition input: {}; clipforge {}; '
'learn_mixing_logit={}, ', self.condition_input,
self.clip_forge_enable, args.learn_mixing_logit)
self.act = act = nn.SiLU()
self.num_scales = args.num_scales_dae
self.num_input_channels = num_input_channels
self.nf = nf = args.num_channels_dae
num_cell_per_scale_dae = args.num_cell_per_scale_dae if 'num_cell_per_scale_dae' not in kwargs else kwargs[
'num_cell_per_scale_dae']
# take clip feature as input
if self.clip_forge_enable:
self.clip_feat_mapping = nn.Conv1d(self.cfg.clipforge.feat_dim, self.nf, 1)
# mixed_prediction #
self.mixed_prediction = args.mixed_prediction # This enables mixed prediction
if self.mixed_prediction:
logger.info('init-mixing_logit = {}, after sigmoid = {}',
args.mixing_logit_init, torch.sigmoid(torch.tensor(args.mixing_logit_init)))
assert(args.mixing_logit_init), f'require learning'
# if not args.learn_mixing_logit and args.hypara_mixing_logit:
# # not learn, treat it as hyparameters
# init = args.mixing_logit_init * torch.ones(size=[1, num_input_channels, 1, 1])
# self.mixing_logit = torch.nn.Parameter(init, requires_grad=False) # not update
# self.is_active = None
# elif not args.learn_mixing_logit: # not learn, loaded from c04cd1h exp
# init = torch.load('../exp/1110/chair/c04cd1h_hvae3s_390f8dhInitSepesTrainvae0_hvaeB72l1E4W1/mlogit.pt')
# self.mixing_logit = torch.nn.Parameter(init, requires_grad=False)
# self.is_active = None
# else:
if True:
init = args.mixing_logit_init * torch.ones(size=[1, num_input_channels, 1, 1])
self.mixing_logit = torch.nn.Parameter(init, requires_grad=True)
self.is_active = None
else: # no mixing_logit
self.mixing_logit = None
self.is_active = None
self.embedding_dim = args.embedding_dim
self.embedding_dim_mult = 4
self.temb_fun = init_temb_fun(args.embedding_type, args.embedding_scale, args.embedding_dim)
logger.info('[temb_fun] embedding_type={}, embedding_scale={}, embedding_dim={}',
args.embedding_type, args.embedding_scale, args.embedding_dim)
# exit()
modules = []
modules.append(nn.Conv2d(self.embedding_dim, self.embedding_dim * 4, 1, 1))
modules.append(nn.Conv2d(self.embedding_dim * 4, nf, 1, 1))
self.temb_layer = nn.Sequential(*modules)
modules = []
input_channels = num_input_channels
self.input_layer = nn.Conv2d(input_channels, nf, 1, 1)
in_ch = nf
for i_block in range(args.num_cell_per_scale_dae):
modules.append(self.building_block(nf, nf))
self.output_layer = nn.Conv2d(nf, input_channels, 1, 1)
self.all_modules = nn.ModuleList(modules)
def forward(self, x, t, **kwargs):
# timestep/noise_level embedding; only for continuous training
# time embedding
if t.dim() == 0:
t = t.expand(1)
temb = self.temb_fun(t)[:, :, None, None] # make it 4d
temb = self.temb_layer(temb)
if self.clip_forge_enable:
clip_feat = kwargs['clip_feat']
clip_feat = self.clip_feat_mapping(clip_feat[:, :, None])[:, :, :, None] # B,D -> BD1->B,D,1,1
if temb.shape[0] == 1 and temb.shape[0] < clip_feat.shape[0]:
temb = temb.expand(clip_feat.shape[0], -1, -1, -1)
temb = torch.cat([temb, clip_feat], dim=1) # add to temb feature
# mask out inactive variables
if self.mixed_prediction and self.is_active is not None:
x = mask_inactive_variables(x, self.is_active)
x = self.input_layer(x)
for layer in self.all_modules:
enc_input = x
x = layer(enc_input, temb)
h = self.output_layer(x)
return h
class PriorSEDrop(Prior):
def __init__(self, *args, **kwargs):
self.building_block = functools.partial(ResBlockSEDrop, dropout=args[0].dropout)
super().__init__(*args, **kwargs)
class PriorSEClip(Prior):
building_block = ResBlockSEClip
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)