2023-08-04 13:28:41 +00:00
from typing import cast
2023-09-21 09:47:11 +00:00
from warnings import warn
2023-12-11 10:46:38 +00:00
import pytest
2024-02-22 11:02:58 +00:00
from torch import Generator , Tensor , allclose , device as Device , equal , isclose , randn , tensor
2023-12-11 10:46:38 +00:00
2023-12-04 14:08:34 +00:00
from refiners . fluxion import manual_seed
2024-02-22 11:02:58 +00:00
from refiners . foundationals . latent_diffusion . solvers import (
DDIM ,
DDPM ,
DPMSolver ,
Euler ,
LCMSolver ,
2024-02-22 14:16:22 +00:00
ModelPredictionType ,
2024-02-22 11:02:58 +00:00
NoiseSchedule ,
Solver ,
2024-02-22 14:16:22 +00:00
SolverParams ,
2024-02-22 11:02:58 +00:00
TimestepSpacing ,
)
2023-12-13 10:43:11 +00:00
def test_ddpm_diffusers ( ) :
from diffusers import DDPMScheduler # type: ignore
diffusers_scheduler = DDPMScheduler ( beta_schedule = " scaled_linear " , beta_start = 0.00085 , beta_end = 0.012 )
diffusers_scheduler . set_timesteps ( 1000 )
refiners_scheduler = DDPM ( num_inference_steps = 1000 )
assert equal ( diffusers_scheduler . timesteps , refiners_scheduler . timesteps )
2023-08-04 13:28:41 +00:00
2024-01-18 13:30:13 +00:00
@pytest.mark.parametrize ( " n_steps, last_step_first_order " , [ ( 5 , False ) , ( 5 , True ) , ( 30 , False ) , ( 30 , True ) ] )
def test_dpm_solver_diffusers ( n_steps : int , last_step_first_order : bool ) :
2023-08-04 13:28:41 +00:00
from diffusers import DPMSolverMultistepScheduler as DiffuserScheduler # type: ignore
manual_seed ( 0 )
2024-01-18 13:30:13 +00:00
diffusers_scheduler = DiffuserScheduler (
beta_schedule = " scaled_linear " ,
beta_start = 0.00085 ,
beta_end = 0.012 ,
lower_order_final = False ,
euler_at_final = last_step_first_order ,
2024-02-04 06:55:31 +00:00
final_sigmas_type = " sigma_min " , # default before Diffusers 0.26.0
2024-01-18 13:30:13 +00:00
)
diffusers_scheduler . set_timesteps ( n_steps )
2024-02-22 14:16:22 +00:00
refiners_scheduler = DPMSolver (
num_inference_steps = n_steps ,
last_step_first_order = last_step_first_order ,
)
2024-02-22 11:02:58 +00:00
assert equal ( refiners_scheduler . timesteps , diffusers_scheduler . timesteps )
2023-08-04 13:28:41 +00:00
sample = randn ( 1 , 3 , 32 , 32 )
2024-01-20 17:37:49 +00:00
predicted_noise = randn ( 1 , 3 , 32 , 32 )
2023-08-04 13:28:41 +00:00
for step , timestep in enumerate ( diffusers_scheduler . timesteps ) :
2024-01-20 17:37:49 +00:00
diffusers_output = cast ( Tensor , diffusers_scheduler . step ( predicted_noise , timestep , sample ) . prev_sample ) # type: ignore
refiners_output = refiners_scheduler ( x = sample , predicted_noise = predicted_noise , step = step )
2023-12-08 11:26:50 +00:00
assert allclose ( diffusers_output , refiners_output , rtol = 0.01 ) , f " outputs differ at step { step } "
2023-08-04 13:28:41 +00:00
2023-12-12 16:18:29 +00:00
def test_ddim_diffusers ( ) :
2023-08-04 13:28:41 +00:00
from diffusers import DDIMScheduler # type: ignore
2023-12-12 16:18:29 +00:00
manual_seed ( 0 )
2023-08-04 13:28:41 +00:00
diffusers_scheduler = DDIMScheduler (
beta_end = 0.012 ,
beta_schedule = " scaled_linear " ,
beta_start = 0.00085 ,
num_train_timesteps = 1000 ,
steps_offset = 1 ,
clip_sample = False ,
)
diffusers_scheduler . set_timesteps ( 30 )
refiners_scheduler = DDIM ( num_inference_steps = 30 )
2024-02-22 11:02:58 +00:00
assert equal ( refiners_scheduler . timesteps , diffusers_scheduler . timesteps )
2023-08-04 13:28:41 +00:00
sample = randn ( 1 , 4 , 32 , 32 )
2024-01-20 17:37:49 +00:00
predicted_noise = randn ( 1 , 4 , 32 , 32 )
2023-08-04 13:28:41 +00:00
for step , timestep in enumerate ( diffusers_scheduler . timesteps ) :
2024-01-20 17:37:49 +00:00
diffusers_output = cast ( Tensor , diffusers_scheduler . step ( predicted_noise , timestep , sample ) . prev_sample ) # type: ignore
refiners_output = refiners_scheduler ( x = sample , predicted_noise = predicted_noise , step = step )
2023-08-04 13:28:41 +00:00
assert allclose ( diffusers_output , refiners_output , rtol = 0.01 ) , f " outputs differ at step { step } "
2023-09-21 09:47:11 +00:00
2024-02-22 14:16:22 +00:00
@pytest.mark.parametrize ( " model_prediction_type " , [ ModelPredictionType . NOISE , ModelPredictionType . SAMPLE ] )
def test_euler_diffusers ( model_prediction_type : ModelPredictionType ) :
2024-01-10 10:45:19 +00:00
from diffusers import EulerDiscreteScheduler # type: ignore
2024-01-10 10:32:40 +00:00
manual_seed ( 0 )
2024-02-22 14:16:22 +00:00
diffusers_prediction_type = " epsilon " if model_prediction_type == ModelPredictionType . NOISE else " sample "
2024-01-10 10:32:40 +00:00
diffusers_scheduler = EulerDiscreteScheduler (
beta_end = 0.012 ,
beta_schedule = " scaled_linear " ,
beta_start = 0.00085 ,
num_train_timesteps = 1000 ,
steps_offset = 1 ,
timestep_spacing = " linspace " ,
use_karras_sigmas = False ,
2024-02-22 14:16:22 +00:00
prediction_type = diffusers_prediction_type ,
2024-01-10 10:32:40 +00:00
)
diffusers_scheduler . set_timesteps ( 30 )
2024-02-22 14:16:22 +00:00
refiners_scheduler = Euler ( num_inference_steps = 30 , params = SolverParams ( model_prediction_type = model_prediction_type ) )
2024-02-22 11:02:58 +00:00
assert equal ( refiners_scheduler . timesteps , diffusers_scheduler . timesteps )
2024-01-10 10:32:40 +00:00
sample = randn ( 1 , 4 , 32 , 32 )
2024-01-20 17:37:49 +00:00
predicted_noise = randn ( 1 , 4 , 32 , 32 )
2024-01-10 10:32:40 +00:00
2024-01-10 15:34:59 +00:00
ref_init_noise_sigma = diffusers_scheduler . init_noise_sigma # type: ignore
assert isinstance ( ref_init_noise_sigma , Tensor )
assert isclose ( ref_init_noise_sigma , refiners_scheduler . init_noise_sigma ) , " init_noise_sigma differ "
2024-01-10 10:32:40 +00:00
for step , timestep in enumerate ( diffusers_scheduler . timesteps ) :
2024-01-20 17:37:49 +00:00
diffusers_output = cast ( Tensor , diffusers_scheduler . step ( predicted_noise , timestep , sample ) . prev_sample ) # type: ignore
refiners_output = refiners_scheduler ( x = sample , predicted_noise = predicted_noise , step = step )
2024-01-10 10:32:40 +00:00
2024-02-04 06:55:49 +00:00
assert allclose ( diffusers_output , refiners_output , rtol = 0.02 ) , f " outputs differ at step { step } "
2024-01-10 10:32:40 +00:00
2024-02-15 17:41:00 +00:00
def test_lcm_diffusers ( ) :
from diffusers import LCMScheduler # type: ignore
manual_seed ( 0 )
# LCMScheduler is stochastic, make sure we use identical generators
diffusers_generator = Generator ( ) . manual_seed ( 42 )
refiners_generator = Generator ( ) . manual_seed ( 42 )
diffusers_scheduler = LCMScheduler ( )
diffusers_scheduler . set_timesteps ( 4 )
2024-02-22 11:02:58 +00:00
refiners_scheduler = LCMSolver ( num_inference_steps = 4 )
2024-02-15 17:41:00 +00:00
assert equal ( refiners_scheduler . timesteps , diffusers_scheduler . timesteps )
sample = randn ( 1 , 4 , 32 , 32 )
predicted_noise = randn ( 1 , 4 , 32 , 32 )
for step , timestep in enumerate ( diffusers_scheduler . timesteps ) :
alpha_prod_t = diffusers_scheduler . alphas_cumprod [ timestep ]
diffusers_noise_ratio = ( 1 - alpha_prod_t ) . sqrt ( )
diffusers_scale_factor = alpha_prod_t . sqrt ( )
refiners_scale_factor = refiners_scheduler . cumulative_scale_factors [ timestep ]
refiners_noise_ratio = refiners_scheduler . noise_std [ timestep ]
assert refiners_scale_factor == diffusers_scale_factor
assert refiners_noise_ratio == diffusers_noise_ratio
d_out = diffusers_scheduler . step ( predicted_noise , timestep , sample , generator = diffusers_generator ) # type: ignore
diffusers_output = cast ( Tensor , d_out . prev_sample ) # type: ignore
refiners_output = refiners_scheduler (
x = sample ,
predicted_noise = predicted_noise ,
step = step ,
generator = refiners_generator ,
)
assert allclose ( refiners_output , diffusers_output , rtol = 0.01 ) , f " outputs differ at step { step } "
2024-02-22 11:02:58 +00:00
def test_solver_remove_noise ( ) :
2023-10-05 14:44:38 +00:00
from diffusers import DDIMScheduler # type: ignore
2023-12-12 16:18:29 +00:00
manual_seed ( 0 )
2023-10-05 14:44:38 +00:00
diffusers_scheduler = DDIMScheduler (
beta_end = 0.012 ,
beta_schedule = " scaled_linear " ,
beta_start = 0.00085 ,
num_train_timesteps = 1000 ,
steps_offset = 1 ,
clip_sample = False ,
)
diffusers_scheduler . set_timesteps ( 30 )
refiners_scheduler = DDIM ( num_inference_steps = 30 )
sample = randn ( 1 , 4 , 32 , 32 )
noise = randn ( 1 , 4 , 32 , 32 )
for step , timestep in enumerate ( diffusers_scheduler . timesteps ) :
2024-01-10 10:41:47 +00:00
diffusers_output = cast ( Tensor , diffusers_scheduler . step ( noise , timestep , sample ) . pred_original_sample ) # type: ignore
2023-10-05 14:44:38 +00:00
refiners_output = refiners_scheduler . remove_noise ( x = sample , noise = noise , step = step )
assert allclose ( diffusers_output , refiners_output , rtol = 0.01 ) , f " outputs differ at step { step } "
2024-02-22 11:02:58 +00:00
def test_solver_device ( test_device : Device ) :
2023-09-21 09:47:11 +00:00
if test_device . type == " cpu " :
warn ( " not running on CPU, skipping " )
pytest . skip ( )
scheduler = DDIM ( num_inference_steps = 30 , device = test_device )
x = randn ( 1 , 4 , 32 , 32 , device = test_device )
noise = randn ( 1 , 4 , 32 , 32 , device = test_device )
2024-01-19 09:55:04 +00:00
noised = scheduler . add_noise ( x , noise , scheduler . first_inference_step )
2023-09-21 09:47:11 +00:00
assert noised . device == test_device
2024-01-30 16:47:06 +00:00
2024-04-18 09:42:31 +00:00
def test_solver_add_noise ( test_device : Device ) :
scheduler = DDIM ( num_inference_steps = 30 , device = test_device )
latent = randn ( 1 , 4 , 32 , 32 , device = test_device )
noise = randn ( 1 , 4 , 32 , 32 , device = test_device )
noised = scheduler . add_noise (
x = latent ,
noise = noise ,
step = 0 ,
)
noised_double = scheduler . add_noise (
x = latent . repeat ( 2 , 1 , 1 , 1 ) ,
noise = noise . repeat ( 2 , 1 , 1 , 1 ) ,
step = [ 0 , 0 ] ,
)
assert allclose ( noised , noised_double [ 0 ] )
assert allclose ( noised , noised_double [ 1 ] )
2024-01-30 16:47:06 +00:00
@pytest.mark.parametrize ( " noise_schedule " , [ NoiseSchedule . UNIFORM , NoiseSchedule . QUADRATIC , NoiseSchedule . KARRAS ] )
2024-02-22 11:02:58 +00:00
def test_solver_noise_schedules ( noise_schedule : NoiseSchedule , test_device : Device ) :
2024-02-22 14:16:22 +00:00
scheduler = DDIM (
num_inference_steps = 30 ,
params = SolverParams ( noise_schedule = noise_schedule ) ,
device = test_device ,
)
2024-01-30 16:47:06 +00:00
assert len ( scheduler . scale_factors ) == 1000
2024-02-22 14:16:22 +00:00
assert scheduler . scale_factors [ 0 ] == 1 - scheduler . params . initial_diffusion_rate
assert scheduler . scale_factors [ - 1 ] == 1 - scheduler . params . final_diffusion_rate
2024-02-22 11:02:58 +00:00
def test_solver_timestep_spacing ( ) :
# Tests we get the results from [[arXiv:2305.08891] Common Diffusion Noise Schedules and Sample Steps are Flawed](https://arxiv.org/abs/2305.08891) table 2.
linspace_int = Solver . generate_timesteps (
2024-02-22 14:16:22 +00:00
spacing = TimestepSpacing . LINSPACE_ROUNDED ,
2024-02-22 11:02:58 +00:00
num_inference_steps = 10 ,
num_train_timesteps = 1000 ,
offset = 1 ,
)
assert equal ( linspace_int , tensor ( [ 1000 , 889 , 778 , 667 , 556 , 445 , 334 , 223 , 112 , 1 ] ) )
leading = Solver . generate_timesteps (
spacing = TimestepSpacing . LEADING ,
num_inference_steps = 10 ,
num_train_timesteps = 1000 ,
offset = 1 ,
)
assert equal ( leading , tensor ( [ 901 , 801 , 701 , 601 , 501 , 401 , 301 , 201 , 101 , 1 ] ) )
trailing = Solver . generate_timesteps (
spacing = TimestepSpacing . TRAILING ,
num_inference_steps = 10 ,
num_train_timesteps = 1000 ,
offset = 1 ,
)
assert equal ( trailing , tensor ( [ 1000 , 900 , 800 , 700 , 600 , 500 , 400 , 300 , 200 , 100 ] ) )