Test4 / api /ltx_server_refactored.py
euiiiia's picture
Update api/ltx_server_refactored.py
827816d verified
raw
history blame
24.7 kB
# ltx_server_clean_refactor.py — VideoService (Modular Version with Simple Overlap Chunking)
# ==============================================================================
# 0. CONFIGURAÇÃO DE AMBIENTE E IMPORTAÇÕES
# ==============================================================================
import os
import sys
import gc
import cv2
import yaml
import time
import json
import random
import shutil
import warnings
import tempfile
import traceback
import subprocess
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
# --- Configurações de Logging e Avisos ---
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
from huggingface_hub import logging as hf_logging
hf_logging.set_verbosity_error()
# --- Importações de Bibliotecas de ML/Processamento ---
import torch
import torch.nn.functional as F
import numpy as np
from PIL import Image
from einops import rearrange
from huggingface_hub import hf_hub_download
from safetensors import safe_open
from managers.vae_manager import vae_manager_singleton
from tools.video_encode_tool import video_encode_tool_singleton
# --- Constantes Globais ---
LTXV_DEBUG = True # Mude para False para desativar logs de debug
LTXV_FRAME_LOG_EVERY = 8
DEPS_DIR = Path("/data")
LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video"
RESULTS_DIR = Path("/app/output")
DEFAULT_FPS = 24.0
# ==============================================================================
# 1. SETUP E FUNÇÕES AUXILIARES DE AMBIENTE
# ==============================================================================
def _run_setup_script():
"""Executa o script setup.py se o repositório LTX-Video não existir."""
setup_script_path = "setup.py"
if not os.path.exists(setup_script_path):
print("[DEBUG] 'setup.py' não encontrado. Pulando clonagem de dependências.")
return
print(f"[DEBUG] Repositório não encontrado em {LTX_VIDEO_REPO_DIR}. Executando setup.py...")
try:
subprocess.run([sys.executable, setup_script_path], check=True, capture_output=True, text=True)
print("[DEBUG] Script 'setup.py' concluído com sucesso.")
except subprocess.CalledProcessError as e:
print(f"[ERROR] Falha ao executar 'setup.py' (código {e.returncode}).\nOutput:\n{e.stdout}\n{e.stderr}")
sys.exit(1)
def add_deps_to_path(repo_path: Path):
"""Adiciona o diretório do repositório ao sys.path para importações locais."""
resolved_path = str(repo_path.resolve())
if resolved_path not in sys.path:
sys.path.insert(0, resolved_path)
if LTXV_DEBUG:
print(f"[DEBUG] Adicionado ao sys.path: {resolved_path}")
# --- Execução da configuração inicial ---
if not LTX_VIDEO_REPO_DIR.exists():
_run_setup_script()
add_deps_to_path(LTX_VIDEO_REPO_DIR)
# --- Importações Dependentes do Path Adicionado ---
from ltx_video.models.autoencoders.vae_encode import un_normalize_latents, normalize_latents
from ltx_video.pipelines.pipeline_ltx_video import adain_filter_latent
from ltx_video.models.autoencoders.latent_upsampler import LatentUpsampler
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline
from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer
from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder
from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier
from ltx_video.models.transformers.transformer3d import Transformer3DModel
from ltx_video.schedulers.rf import RectifiedFlowScheduler
from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy
import ltx_video.pipelines.crf_compressor as crf_compressor
from ltx_video.models.autoencoders.vae_encode import (
get_vae_size_scale_factor,
latent_to_pixel_coords,
vae_decode,
vae_encode,
)
def create_latent_upsampler(latent_upsampler_model_path: str, device: str):
latent_upsampler = LatentUpsampler.from_pretrained(latent_upsampler_model_path)
latent_upsampler.to(device)
latent_upsampler.eval()
return latent_upsampler
def create_ltx_video_pipeline(
ckpt_path: str,
precision: str,
text_encoder_model_name_or_path: str,
sampler: Optional[str] = None,
device: Optional[str] = None,
enhance_prompt: bool = False,
prompt_enhancer_image_caption_model_name_or_path: Optional[str] = None,
prompt_enhancer_llm_model_name_or_path: Optional[str] = None,
) -> LTXVideoPipeline:
ckpt_path = Path(ckpt_path)
assert os.path.exists(
ckpt_path
), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist"
with safe_open(ckpt_path, framework="pt") as f:
metadata = f.metadata()
config_str = metadata.get("config")
configs = json.loads(config_str)
allowed_inference_steps = configs.get("allowed_inference_steps", None)
vae = CausalVideoAutoencoder.from_pretrained(ckpt_path)
transformer = Transformer3DModel.from_pretrained(ckpt_path)
# Use constructor if sampler is specified, otherwise use from_pretrained
if sampler == "from_checkpoint" or not sampler:
scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path)
else:
scheduler = RectifiedFlowScheduler(
sampler=("Uniform" if sampler.lower() == "uniform" else "LinearQuadratic")
)
text_encoder = T5EncoderModel.from_pretrained(
text_encoder_model_name_or_path, subfolder="text_encoder"
)
patchifier = SymmetricPatchifier(patch_size=1)
tokenizer = T5Tokenizer.from_pretrained(
text_encoder_model_name_or_path, subfolder="tokenizer"
)
transformer = transformer.to(device)
vae = vae.to(device)
text_encoder = text_encoder.to(device)
if enhance_prompt:
prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained(
prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True
)
prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained(
prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True
)
prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained(
prompt_enhancer_llm_model_name_or_path,
torch_dtype="bfloat16",
)
prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained(
prompt_enhancer_llm_model_name_or_path,
)
else:
prompt_enhancer_image_caption_model = None
prompt_enhancer_image_caption_processor = None
prompt_enhancer_llm_model = None
prompt_enhancer_llm_tokenizer = None
vae = vae.to(torch.bfloat16)
if precision == "bfloat16" and transformer.dtype != torch.bfloat16:
transformer = transformer.to(torch.bfloat16)
text_encoder = text_encoder.to(torch.bfloat16)
# Use submodels for the pipeline
submodel_dict = {
"transformer": transformer,
"patchifier": patchifier,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"scheduler": scheduler,
"vae": vae,
"prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model,
"prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor,
"prompt_enhancer_llm_model": prompt_enhancer_llm_model,
"prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer,
"allowed_inference_steps": allowed_inference_steps,
}
pipeline = LTXVideoPipeline(**submodel_dict)
pipeline = pipeline.to(device)
return pipeline
# ==============================================================================
# 3. CLASSE PRINCIPAL DO SERVIÇO DE VÍDEO
# ==============================================================================
class VideoService:
"""
Serviço encapsulado para gerar vídeos usando a pipeline LTX-Video.
Gerencia o carregamento de modelos, pré-processamento, geração em múltiplos
passos (baixa resolução, upscale com denoise) e pós-processamento.
"""
def __init__(self):
"""Inicializa o serviço, carregando configurações e modelos."""
t0 = time.perf_counter()
print("[INFO] Inicializando VideoService...")
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.config = self._load_config("ltxv-13b-0.9.8-dev-fp8.yaml")
self.pipeline, self.latent_upsampler = self._load_models_from_hub()
self._move_models_to_device()
self.runtime_autocast_dtype = self._get_precision_dtype()
vae_manager_singleton.attach_pipeline(
self.pipeline,
device=self.device,
autocast_dtype=self.runtime_autocast_dtype
)
self._tmp_dirs = set()
RESULTS_DIR.mkdir(exist_ok=True)
print(f"[INFO] VideoService pronto. Tempo de inicialização: {time.perf_counter()-t0:.2f}s")
# --------------------------------------------------------------------------
# --- Métodos Públicos (API do Serviço) ---
# --------------------------------------------------------------------------
def generate_low_resolution(
self, prompt: str, negative_prompt: str,
height: int, width: int, duration_secs: float,
guidance_scale: float, seed: Optional[int] = None,
conditioning_items: Optional[List[ConditioningItem]] = None
) -> Tuple[str, str, int]:
"""
Gera um vídeo de baixa resolução e retorna os caminhos para o vídeo e os latentes.
"""
used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed)
self._seed_everething(used_seed)
actual_num_frames = int(duration_secs * DEFAULT_FPS)
downscaled_height, downscaled_width = self._calculate_downscaled_dims(height, width)
first_pass_kwargs = {
"prompt": prompt,
"negative_prompt": negative_prompt,
"height": downscaled_height,
"width": downscaled_width,
"num_frames": max(24, actual_num_frames)+1,
"frame_rate": int(DEFAULT_FPS),
"generator": torch.Generator(device=self.device).manual_seed(used_seed),
"output_type": "latent",
"conditioning_items": conditioning_items,
"guidance_scale": float(guidance_scale),
"is_video": True,
"vae_per_channel_normalize": True,
**(self.config.get("first_pass", {}))
}
temp_dir = tempfile.mkdtemp(prefix="ltxv_low_")
self._register_tmp_dir(temp_dir)
try:
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
latents = self.pipeline(**first_pass_kwargs).images
pixel_tensor = vae_manager_singleton.decode(latents.clone(), decode_timestep=float(self.config.get("decode_timestep", 0.05)))
video_path = self._save_video_from_tensor(pixel_tensor, "low_res_video", used_seed, temp_dir)
latents_path = self._save_latents_to_disk(latents, "latents_low_res", used_seed)
return video_path, latents_path, used_seed
except Exception as e:
print(f"[ERROR] Falha na geração de baixa resolução: {e}")
traceback.print_exc()
raise
finally:
self._finalize()
def generate_upscale_denoise(
self, latents_path: str, prompt: str,
negative_prompt: str, height: int, width: int,
num_frames: float, guidance_scale: float, seed: Optional[int] = None,
conditioning_items: Optional[List[ConditioningItem]] = None
) -> Tuple[str, str]:
"""
Aplica upscale, AdaIN e Denoise em latentes de baixa resolução usando um processo de chunking.
"""
used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed)
self._seed_everething(used_seed)
temp_dir = tempfile.mkdtemp(prefix="ltxv_up_")
self._register_tmp_dir(temp_dir)
try:
latents_low = torch.load(latents_path).to(self.device)
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
upsampled_latents = latents_low #self._upsample_and_filter_latents(latents_low)
#chunks = self._split_latents_with_overlap(upsampled_latents)
#refined_chunks = []
#for chunk in chunks:
#if chunk.shape[2] <= 1: continue # Pula chunks inválidos
chunk = upsampled_latents
second_pass_height = chunk.shape[3] * self.pipeline.vae_scale_factor
second_pass_width = chunk.shape[4] * self.pipeline.vae_scale_factor
second_pass_kwargs = {
"prompt": prompt,
"negative_prompt": negative_prompt,
"height": second_pass_height,
"width": second_pass_width,
"frame_rate": int(DEFAULT_FPS),
"num_frames": num_frames,
"latents": chunk, # O tensor completo é passado aqui
"guidance_scale": float(guidance_scale),
"output_type": "latent",
"generator": torch.Generator(device=self.device).manual_seed(used_seed),
"conditioning_items": conditioning_items,
"is_video": True,
"vae_per_channel_normalize": True,
**(self.config.get("second_pass", {}))
}
refined_chunk = self.pipeline(**second_pass_kwargs).images
#refined_chunks.append(refined_chunk)
del latents_low; torch.cuda.empty_cache()
final_latents = refined_chunk #self._merge_chunks_with_overlap(refined_chunks)
latents_path = self._save_latents_to_disk(final_latents, "latents_refined", used_seed)
pixel_tensor = vae_manager_singleton.decode(final_latents, decode_timestep=float(self.config.get("decode_timestep", 0.05)))
video_path = self._save_video_from_tensor(pixel_tensor, "refined_video", used_seed, temp_dir)
return video_path, latents_path
except Exception as e:
print(f"[ERROR] Falha no processo de upscale e denoise: {e}")
traceback.print_exc()
raise
finally:
self._finalize()
def encode_latents_to_mp4(self, latents_path: str, fps: int = int(DEFAULT_FPS)) -> str:
"""Decodifica um tensor de latentes salvo e o salva como um vídeo MP4."""
latents = torch.load(latents_path)
temp_dir = tempfile.mkdtemp(prefix="ltxv_enc_")
self._register_tmp_dir(temp_dir)
seed = random.randint(0, 99999) # Seed apenas para nome do arquivo
try:
chunks = self._split_latents_with_overlap(latents)
pixel_chunks = []
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')):
for chunk in chunks:
if chunk.shape[2] == 0: continue
pixel_chunk = vae_manager_singleton.decode(chunk.to(self.device), decode_timestep=float(self.config.get("decode_timestep", 0.05)))
pixel_chunks.append(pixel_chunk)
final_pixel_tensor = self._merge_chunks_with_overlap(pixel_chunks)
final_video_path = self._save_video_from_tensor(final_pixel_tensor, f"final_video_{seed}", seed, temp_dir, fps=fps)
return final_video_path
except Exception as e:
print(f"[ERROR] Falha ao encodar latentes para MP4: {e}")
traceback.print_exc()
raise
finally:
self._finalize()
# --------------------------------------------------------------------------
# --- Métodos Internos e Auxiliares ---
# --------------------------------------------------------------------------
def _finalize(self):
"""Limpa a memória da GPU e os diretórios temporários."""
if LTXV_DEBUG:
print("[DEBUG] Finalize: iniciando limpeza...")
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
# Limpa todos os diretórios temporários registrados
for d in list(self._tmp_dirs):
shutil.rmtree(d, ignore_errors=True)
self._tmp_dirs.remove(d)
if LTXV_DEBUG:
print(f"[DEBUG] Diretório temporário removido: {d}")
def _load_config(self, config_filename: str) -> Dict:
"""Carrega o arquivo de configuração YAML."""
config_path = LTX_VIDEO_REPO_DIR / "configs" / config_filename
print(f"[INFO] Carregando configuração de: {config_path}")
with open(config_path, "r") as file:
return yaml.safe_load(file)
def _load_models_from_hub(self):
"""Baixa e cria as instâncias da pipeline e do upsampler."""
t0 = time.perf_counter()
LTX_REPO = "Lightricks/LTX-Video"
print("[INFO] Baixando checkpoint principal...")
self.config["checkpoint_path"] = hf_hub_download(
repo_id=LTX_REPO, filename=self.config["checkpoint_path"],
token=os.getenv("HF_TOKEN")
)
print(f"[INFO] Checkpoint principal em: {self.config['checkpoint_path']}")
print("[INFO] Construindo pipeline...")
pipeline = create_ltx_video_pipeline(
ckpt_path=self.config["checkpoint_path"],
precision=self.config["precision"],
text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"],
sampler=self.config["sampler"],
device="cpu", # Carrega em CPU primeiro
enhance_prompt=False
)
print("[INFO] Pipeline construída.")
latent_upsampler = None
if self.config.get("spatial_upscaler_model_path"):
print("[INFO] Baixando upscaler espacial...")
self.config["spatial_upscaler_model_path"] = hf_hub_download(
repo_id=LTX_REPO, filename=self.config["spatial_upscaler_model_path"],
token=os.getenv("HF_TOKEN")
)
print(f"[INFO] Upscaler em: {self.config['spatial_upscaler_model_path']}")
print("[INFO] Construindo latent_upsampler...")
latent_upsampler = create_latent_upsampler(self.config["spatial_upscaler_model_path"], device="cpu")
print("[INFO] Latent upsampler construído.")
print(f"[INFO] Carregamento de modelos concluído em {time.perf_counter()-t0:.2f}s")
return pipeline, latent_upsampler
def _move_models_to_device(self):
"""Move os modelos carregados para o dispositivo de computação (GPU/CPU)."""
print(f"[INFO] Movendo modelos para o dispositivo: {self.device}")
self.pipeline.to(self.device)
if self.latent_upsampler:
self.latent_upsampler.to(self.device)
def _get_precision_dtype(self) -> torch.dtype:
"""Determina o dtype para autocast com base na configuração de precisão."""
prec = str(self.config.get("precision", "")).lower()
if prec in ["float8_e4m3fn", "bfloat16"]:
return torch.bfloat16
elif prec == "mixed_precision":
return torch.float16
return torch.float32
@torch.no_grad()
def _upsample_and_filter_latents(self, latents: torch.Tensor) -> torch.Tensor:
"""Aplica o upsample espacial e o filtro AdaIN aos latentes."""
if not self.latent_upsampler:
raise ValueError("Latent Upsampler não está carregado para a operação de upscale.")
latents_unnormalized = un_normalize_latents(latents, self.pipeline.vae, vae_per_channel_normalize=True)
upsampled_latents_unnormalized = self.latent_upsampler(latents_unnormalized)
upsampled_latents_normalized = normalize_latents(upsampled_latents_unnormalized, self.pipeline.vae, vae_per_channel_normalize=True)
# Filtro AdaIN para manter consistência de cor/estilo com o vídeo de baixa resolução
return adain_filter_latent(latents=upsampled_latents_normalized, reference_latents=latents)
def _calculate_downscaled_dims(self, height: int, width: int) -> Tuple[int, int]:
"""Calcula as dimensões para o primeiro passo (baixa resolução)."""
height_padded = ((height - 1) // 8 + 1) * 8
width_padded = ((width - 1) // 8 + 1) * 8
downscale_factor = self.config.get("downscale_factor", 0.6666666)
vae_scale_factor = self.pipeline.vae_scale_factor
target_w = int(width_padded * downscale_factor)
downscaled_width = target_w - (target_w % vae_scale_factor)
target_h = int(height_padded * downscale_factor)
downscaled_height = target_h - (target_h % vae_scale_factor)
return downscaled_height, downscaled_width
def _split_latents_with_overlap(self, latents: torch.Tensor, overlap: int = 1) -> List[torch.Tensor]:
"""Divide um tensor de latentes em dois chunks com sobreposição."""
total_frames = latents.shape[2]
if total_frames <= overlap:
return [latents]
mid_point = max(overlap, total_frames // 2)
chunk1 = latents[:, :, :mid_point, :, :]
# O segundo chunk começa 'overlap' frames antes para criar a sobreposição
chunk2 = latents[:, :, mid_point - overlap:, :, :]
return [c for c in [chunk1, chunk2] if c.shape[2] > 0]
def _merge_chunks_with_overlap(self, chunks: List[torch.Tensor], overlap: int = 1) -> torch.Tensor:
"""Junta uma lista de chunks, removendo a sobreposição."""
if not chunks:
return torch.empty(0)
if len(chunks) == 1:
return chunks[0]
# Pega o primeiro chunk sem o frame de sobreposição final
merged_list = [chunks[0][:, :, :-overlap, :, :]]
# Adiciona os chunks restantes
merged_list.extend(chunks[1:])
return torch.cat(merged_list, dim=2)
def _save_latents_to_disk(self, latents_tensor: torch.Tensor, base_filename: str, seed: int) -> str:
"""Salva um tensor de latentes em um arquivo .pt."""
latents_cpu = latents_tensor.detach().to("cpu")
tensor_path = RESULTS_DIR / f"{base_filename}_{seed}.pt"
torch.save(latents_cpu, tensor_path)
if LTXV_DEBUG:
print(f"[DEBUG] Latentes salvos em: {tensor_path}")
return str(tensor_path)
def _save_video_from_tensor(self, pixel_tensor: torch.Tensor, base_filename: str, seed: int, temp_dir: str, fps: int = int(DEFAULT_FPS)) -> str:
"""Salva um tensor de pixels como um arquivo de vídeo MP4."""
temp_path = os.path.join(temp_dir, f"{base_filename}_{seed}.mp4")
video_encode_tool_singleton.save_video_from_tensor(pixel_tensor, temp_path, fps=fps)
final_path = RESULTS_DIR / f"{base_filename}_{seed}.mp4"
shutil.move(temp_path, final_path)
print(f"[INFO] Vídeo final salvo em: {final_path}")
return str(final_path)
def _register_tmp_dir(self, dir_path: str):
"""Registra um diretório temporário para limpeza posterior."""
if dir_path and os.path.isdir(dir_path):
self._tmp_dirs.add(dir_path)
if LTXV_DEBUG:
print(f"[DEBUG] Diretório temporário registrado: {dir_path}")
def _seed_everething(self, seed: int):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
if torch.backends.mps.is_available():
torch.mps.manual_seed(seed)
# ==============================================================================
# 4. INSTANCIAÇÃO E PONTO DE ENTRADA (Exemplo)
# ==============================================================================
video_generation_service = VideoService()
print("Instância do VideoService pronta para uso.")