Wan2.1-VAE / handler.py
hlky's picture
hlky HF staff
Update handler.py
dbaddac verified
from typing import cast, Union
import torch
from diffusers import AutoencoderKLWan
from diffusers.video_processor import VideoProcessor
from diffusers.utils import export_to_video
class EndpointHandler:
def __init__(self, path=""):
self.device = "cuda"
self.dtype = torch.float16
self.vae = cast(
AutoencoderKLWan,
AutoencoderKLWan.from_pretrained(path, torch_dtype=self.dtype)
.to(self.device, self.dtype)
.eval(),
)
self.vae_scale_factor_temporal = (
2 ** sum(self.vae.temperal_downsample) if getattr(self, "vae", None) else 4
)
self.vae_scale_factor_spatial = (
2 ** len(self.vae.temperal_downsample) if getattr(self, "vae", None) else 8
)
self.video_processor = VideoProcessor(
vae_scale_factor=self.vae_scale_factor_spatial
)
@torch.no_grad()
def __call__(self, data) -> Union[torch.Tensor, bytes]:
"""
Args:
data (:obj:):
includes the input data and the parameters for the inference.
"""
tensor = cast(torch.Tensor, data["inputs"])
parameters = cast(dict, data.get("parameters", {}))
do_scaling = cast(bool, parameters.get("do_scaling", True))
output_type = cast(str, parameters.get("output_type", "pil"))
partial_postprocess = cast(bool, parameters.get("partial_postprocess", False))
if partial_postprocess and output_type != "pt":
output_type = "pt"
tensor = tensor.to(self.device, self.dtype)
if do_scaling:
latents_mean = (
torch.tensor(self.vae.config.latents_mean)
.view(1, self.vae.config.z_dim, 1, 1, 1)
.to(tensor.device, tensor.dtype)
)
latents_std = 1.0 / torch.tensor(self.vae.config.latents_std).view(
1, self.vae.config.z_dim, 1, 1, 1
).to(tensor.device, tensor.dtype)
tensor = tensor / latents_std + latents_mean
with torch.no_grad():
frames = cast(torch.Tensor, self.vae.decode(tensor, return_dict=False)[0])
if partial_postprocess:
frames = frames[0].permute(1, 0, 2, 3)
frames = torch.stack([(frame * 0.5 + 0.5).clamp(0, 1) for frame in frames])
frames = frames.permute(0, 2, 3, 1).contiguous().float()
frames = (frames * 255).round().to(torch.uint8)
elif output_type == "pil":
frames = cast(
torch.Tensor,
self.video_processor.postprocess_video(frames, output_type="pt")[0],
)
elif output_type == "mp4":
frames = cast(
torch.Tensor,
self.video_processor.postprocess_video(frames, output_type="pil")[0],
)
path = export_to_video(frames, fps=16)
with open(path, "rb") as f:
frames = f.read()
elif output_type == "pt":
frames = frames
return frames