File size: 3,082 Bytes
04619f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dbaddac
04619f3
 
 
dbaddac
 
04619f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from typing import cast, Union

import torch

from diffusers import AutoencoderKLWan
from diffusers.video_processor import VideoProcessor
from diffusers.utils import export_to_video


class EndpointHandler:
    def __init__(self, path=""):
        self.device = "cuda"
        self.dtype = torch.float16
        self.vae = cast(
            AutoencoderKLWan,
            AutoencoderKLWan.from_pretrained(path, torch_dtype=self.dtype)
            .to(self.device, self.dtype)
            .eval(),
        )

        self.vae_scale_factor_temporal = (
            2 ** sum(self.vae.temperal_downsample) if getattr(self, "vae", None) else 4
        )
        self.vae_scale_factor_spatial = (
            2 ** len(self.vae.temperal_downsample) if getattr(self, "vae", None) else 8
        )
        self.video_processor = VideoProcessor(
            vae_scale_factor=self.vae_scale_factor_spatial
        )

    @torch.no_grad()
    def __call__(self, data) -> Union[torch.Tensor, bytes]:
        """
        Args:
            data (:obj:):
                includes the input data and the parameters for the inference.
        """
        tensor = cast(torch.Tensor, data["inputs"])
        parameters = cast(dict, data.get("parameters", {}))
        do_scaling = cast(bool, parameters.get("do_scaling", True))
        output_type = cast(str, parameters.get("output_type", "pil"))
        partial_postprocess = cast(bool, parameters.get("partial_postprocess", False))
        if partial_postprocess and output_type != "pt":
            output_type = "pt"

        tensor = tensor.to(self.device, self.dtype)

        if do_scaling:
            latents_mean = (
                torch.tensor(self.vae.config.latents_mean)
                .view(1, self.vae.config.z_dim, 1, 1, 1)
                .to(tensor.device, tensor.dtype)
            )
            latents_std = 1.0 / torch.tensor(self.vae.config.latents_std).view(
                1, self.vae.config.z_dim, 1, 1, 1
            ).to(tensor.device, tensor.dtype)
            tensor = tensor / latents_std + latents_mean

        with torch.no_grad():
            frames = cast(torch.Tensor, self.vae.decode(tensor, return_dict=False)[0])

        if partial_postprocess:
            frames = frames[0].permute(1, 0, 2, 3)
            frames = torch.stack([(frame * 0.5 + 0.5).clamp(0, 1) for frame in frames])
            frames = frames.permute(0, 2, 3, 1).contiguous().float()
            frames = (frames * 255).round().to(torch.uint8)
        elif output_type == "pil":
            frames = cast(
                torch.Tensor,
                self.video_processor.postprocess_video(frames, output_type="pt")[0],
            )
        elif output_type == "mp4":
            frames = cast(
                torch.Tensor,
                self.video_processor.postprocess_video(frames, output_type="pil")[0],
            )
            path = export_to_video(frames, fps=16)
            with open(path, "rb") as f:
                frames = f.read()
        elif output_type == "pt":
            frames = frames

        return frames