File size: 3,494 Bytes
8b7a3d1
 
 
 
c0a7c3c
 
8b7a3d1
 
c0a7c3c
8b7a3d1
 
60ae079
c0a7c3c
8b7a3d1
 
c0a7c3c
 
 
 
 
8b7a3d1
 
 
 
 
 
 
c0a7c3c
8b7a3d1
 
c0a7c3c
8b7a3d1
 
 
 
 
 
c0a7c3c
 
8b7a3d1
 
 
 
 
 
 
 
 
 
 
c0a7c3c
 
8b7a3d1
 
c0a7c3c
 
8b7a3d1
c0a7c3c
765072b
 
 
 
 
c0a7c3c
 
765072b
 
c0a7c3c
60ae079
 
c0a7c3c
 
8b7a3d1
 
 
c0a7c3c
8b7a3d1
c0a7c3c
 
8b7a3d1
 
 
 
 
 
 
c0a7c3c
8b7a3d1
 
 
 
c0a7c3c
 
 
8b7a3d1
 
 
 
c0a7c3c
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from __future__ import annotations

import gc
import pathlib
import sys
import tempfile

import gradio as gr
import imageio
import PIL.Image
import torch
from diffusers.utils.import_utils import is_xformers_available
from einops import rearrange
from huggingface_hub import ModelCard

sys.path.append('Tune-A-Video')

from tuneavideo.models.unet import UNet3DConditionModel
from tuneavideo.pipelines.pipeline_tuneavideo import TuneAVideoPipeline


class InferencePipeline:
    def __init__(self, hf_token: str | None = None):
        self.hf_token = hf_token
        self.pipe = None
        self.device = torch.device(
            'cuda:0' if torch.cuda.is_available() else 'cpu')
        self.model_id = None

    def clear(self) -> None:
        self.model_id = None
        del self.pipe
        self.pipe = None
        torch.cuda.empty_cache()
        gc.collect()

    @staticmethod
    def check_if_model_is_local(model_id: str) -> bool:
        return pathlib.Path(model_id).exists()

    @staticmethod
    def get_model_card(model_id: str,
                       hf_token: str | None = None) -> ModelCard:
        if InferencePipeline.check_if_model_is_local(model_id):
            card_path = (pathlib.Path(model_id) / 'README.md').as_posix()
        else:
            card_path = model_id
        return ModelCard.load(card_path, token=hf_token)

    @staticmethod
    def get_base_model_info(model_id: str, hf_token: str | None = None) -> str:
        card = InferencePipeline.get_model_card(model_id, hf_token)
        return card.data.base_model

    def load_pipe(self, model_id: str) -> None:
        if model_id == self.model_id:
            return
        base_model_id = self.get_base_model_info(model_id, self.hf_token)
        unet = UNet3DConditionModel.from_pretrained(
            model_id,
            subfolder='unet',
            torch_dtype=torch.float16,
            use_auth_token=self.hf_token)
        pipe = TuneAVideoPipeline.from_pretrained(base_model_id,
                                                  unet=unet,
                                                  torch_dtype=torch.float16,
                                                  use_auth_token=self.hf_token)
        pipe = pipe.to(self.device)
        if is_xformers_available():
            pipe.unet.enable_xformers_memory_efficient_attention()
        self.pipe = pipe
        self.model_id = model_id  # type: ignore

    def run(
        self,
        model_id: str,
        prompt: str,
        video_length: int,
        fps: int,
        seed: int,
        n_steps: int,
        guidance_scale: float,
    ) -> PIL.Image.Image:
        if not torch.cuda.is_available():
            raise gr.Error('CUDA is not available.')

        self.load_pipe(model_id)

        generator = torch.Generator(device=self.device).manual_seed(seed)
        out = self.pipe(
            prompt,
            video_length=video_length,
            width=512,
            height=512,
            num_inference_steps=n_steps,
            guidance_scale=guidance_scale,
            generator=generator,
        )  # type: ignore

        frames = rearrange(out.videos[0], 'c t h w -> t h w c')
        frames = (frames * 255).to(torch.uint8).numpy()

        out_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
        writer = imageio.get_writer(out_file.name, fps=fps)
        for frame in frames:
            writer.append_data(frame)
        writer.close()

        return out_file.name