Spaces:
Paused
Paused
File size: 3,525 Bytes
6d08643 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
import torch
import torch.nn as nn
from packaging import version
# import torch._dynamo
# torch._dynamo.config.suppress_errors = True
# torch._dynamo.config.cache_size_limit = 512
OPENAIUNETWRAPPER = "sgm.modules.diffusionmodules.wrappers.OpenAIWrapper"
class IdentityWrapper(nn.Module):
def __init__(self, diffusion_model, compile_model: bool = False):
super().__init__()
compile = (
torch.compile
if (version.parse(torch.__version__) >= version.parse("2.0.0"))
and compile_model
else lambda x: x
)
self.diffusion_model = compile(diffusion_model)
def forward(self, *args, **kwargs):
return self.diffusion_model(*args, **kwargs)
class OpenAIWrapper(IdentityWrapper):
def forward(
self, x: torch.Tensor, t: torch.Tensor, c: dict, **kwargs
) -> torch.Tensor:
x = torch.cat((x, c.get("concat", torch.Tensor([]).type_as(x))), dim=1)
return self.diffusion_model(
x,
timesteps=t,
context=c.get("crossattn", None),
y=c.get("vector", None),
**kwargs,
)
class OpenAIHalfWrapper(IdentityWrapper):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.diffusion_model = self.diffusion_model.half()
def forward(
self, x: torch.Tensor, t: torch.Tensor, c: dict, **kwargs
) -> torch.Tensor:
x = torch.cat((x, c.get("concat", torch.Tensor([]).type_as(x))), dim=1)
_context = c.get("crossattn", None)
_y = c.get("vector", None)
if _context is not None:
_context = _context.half()
if _y is not None:
_y = _y.half()
x = x.half()
t = t.half()
out = self.diffusion_model(
x,
timesteps=t,
context=_context,
y=_y,
**kwargs,
)
return out.float()
class ControlWrapper(nn.Module):
def __init__(self, diffusion_model, compile_model: bool = False, dtype=torch.float32):
super().__init__()
self.compile = (
torch.compile
if (version.parse(torch.__version__) >= version.parse("2.0.0"))
and compile_model
else lambda x: x
)
self.diffusion_model = self.compile(diffusion_model)
self.control_model = None
self.dtype = dtype
def load_control_model(self, control_model):
self.control_model = self.compile(control_model)
def forward(
self, x: torch.Tensor, t: torch.Tensor, c: dict, control_scale=1, **kwargs
) -> torch.Tensor:
with torch.autocast("cuda", dtype=self.dtype):
control = self.control_model(x=c.get("control", None), timesteps=t, xt=x,
control_vector=c.get("control_vector", None),
mask_x=c.get("mask_x", None),
context=c.get("crossattn", None),
y=c.get("vector", None))
out = self.diffusion_model(
x,
timesteps=t,
context=c.get("crossattn", None),
y=c.get("vector", None),
control=control,
control_scale=control_scale,
**kwargs,
)
return out.float()
|