Test / api /vince_pool_manager.py
eeuuia's picture
Create vince_pool_manager.py
c8b13b1 verified
raw
history blame
10.3 kB
# FILE: api/vince_pool_manager.py
# DESCRIPTION: Singleton manager for a pool of VINCIE workers, integrated with a central GPU manager.
import os
import sys
import gc
import subprocess
import threading
from pathlib import Path
from typing import List
import torch
from omegaconf import open_dict
# --- Import do Gerenciador Central de GPUs ---
# Esta é a peça chave da integração. O Pool Manager perguntará a ele quais GPUs usar.
try:
from api.gpu_manager import gpu_manager
except ImportError as e:
print(f"ERRO CRÍTICO: Não foi possível importar o gpu_manager. {e}", file=sys.stderr)
sys.exit(1)
# --- Configurações Globais (Lidas do Ambiente) ---
VINCIE_DIR = Path(os.getenv("VINCIE_DIR", "/data/VINCIE"))
VINCIE_CKPT_DIR = Path(os.getenv("VINCIE_CKPT_DIR", "/data/ckpt/VINCIE-3B"))
# --- Classe Worker (Gerencia uma única GPU de forma isolada) ---
class VinceWorker:
"""
Gerencia uma única instância da pipeline VINCIE em um dispositivo GPU específico.
Opera em um ambiente "isolado" para garantir que só veja sua própria GPU.
"""
def __init__(self, device_id: str, config_path: str):
self.device_id_str = device_id
self.gpu_index_str = self.device_id_str.split(':')[-1]
self.config_path = config_path
self.gen = None
self.config = None
print(f"[VinceWorker-{self.device_id_str}] Inicializado. Mapeado para o índice de GPU físico {self.gpu_index_str}.")
def _execute_in_isolated_env(self, function_to_run, *args, **kwargs):
"""
Wrapper crucial que define CUDA_VISIBLE_DEVICES para isolar a visibilidade da GPU.
Isso garante que o PyTorch e o VINCIE só possam usar a GPU designada para este worker.
"""
original_cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
try:
os.environ['CUDA_VISIBLE_DEVICES'] = self.gpu_index_str
if torch.cuda.is_available():
# Dentro deste contexto, 'cuda:0' refere-se à nossa GPU alvo, pois é a única visível.
torch.cuda.set_device(0)
return function_to_run(*args, **kwargs)
finally:
# Restaura o ambiente original para não afetar outros threads/processos.
if original_cuda_visible is not None:
os.environ['CUDA_VISIBLE_DEVICES'] = original_cuda_visible
elif 'CUDA_VISIBLE_DEVICES' in os.environ:
del os.environ['CUDA_VISIBLE_DEVICES']
def _load_model_task(self):
"""Tarefa de carregamento do modelo, executada no ambiente isolado."""
print(f"[VinceWorker-{self.device_id_str}] Carregando modelo para VRAM (GPU física visível: {self.gpu_index_str})...")
# O dispositivo para o VINCIE será 'cuda:0' porque é a única GPU que este processo pode ver.
device_for_vincie = 'cuda:0' if torch.cuda.is_available() else 'cpu'
original_cwd = Path.cwd()
try:
# O código do VINCIE pode precisar ser executado de seu próprio diretório.
os.chdir(str(VINCIE_DIR))
# Adiciona o diretório ao path do sistema para encontrar os módulos do VINCIE.
if str(VINCIE_DIR) not in sys.path: sys.path.insert(0, str(VINCIE_DIR))
from common.config import load_config, create_object
cfg = load_config(self.config_path, [f"device='{device_for_vincie}'"])
self.gen = create_object(cfg)
self.config = cfg
# Executa os passos de configuração internos do VINCIE.
for name in ("configure_persistence", "configure_models", "configure_diffusion"):
getattr(self.gen, name)()
self.gen.to(torch.device(device_for_vincie))
print(f"[VinceWorker-{self.device_id_str}] ✅ Modelo VINCIE 'quente' e pronto na GPU física {self.gpu_index_str}.")
finally:
os.chdir(original_cwd) # Restaura o diretório de trabalho original.
def load_model_to_gpu(self):
"""Método público para carregar o modelo, garantindo o isolamento da GPU."""
if self.gen is None:
self._execute_in_isolated_env(self._load_model_task)
def _infer_task(self, **kwargs) -> Path:
"""Tarefa de inferência, executada no ambiente isolado."""
original_cwd = Path.cwd()
try:
os.chdir(str(VINCIE_DIR))
# Atualiza a configuração do gerador com os parâmetros da chamada atual.
with open_dict(self.gen.config):
self.gen.config.generation.output.dir = str(kwargs["output_dir"])
image_paths = kwargs.get("image_path", [])
self.gen.config.generation.positive_prompt.image_path = [str(p) for p in image_paths] if isinstance(image_paths, list) else [str(image_paths)]
if "prompts" in kwargs:
self.gen.config.generation.positive_prompt.prompts = list(kwargs["prompts"])
if "cfg_scale" in kwargs and kwargs["cfg_scale"] is not None:
self.gen.config.diffusion.cfg.scale = float(kwargs["cfg_scale"])
# Inicia o loop de inferência do VINCIE.
self.gen.inference_loop()
return Path(kwargs["output_dir"])
finally:
os.chdir(original_cwd)
# Limpeza de memória após a inferência.
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def infer(self, **kwargs) -> Path:
"""Método público para iniciar a inferência, garantindo o isolamento da GPU."""
if self.gen is None:
raise RuntimeError(f"Modelo no worker {self.device_id_str} não foi carregado.")
return self._execute_in_isolated_env(self._infer_task, **kwargs)
# --- Classe Pool Manager (A Orquestradora Singleton) ---
class VincePoolManager:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, output_root: str = "/app/outputs"):
if self._initialized: return
with self._lock:
if self._initialized: return
print("⚙️ Inicializando o VincePoolManager Singleton...")
self.output_root = Path(output_root)
self.output_root.mkdir(parents=True, exist_ok=True)
self.worker_lock = threading.Lock()
self.next_worker_idx = 0
# Pergunta ao gerenciador central quais GPUs ele pode usar.
self.allocated_gpu_indices = gpu_manager.get_vincie_devices()
if not self.allocated_gpu_indices:
# Se não houver GPUs alocadas, não podemos continuar.
# O setup.py já deve ter sido executado, então não precisamos verificar dependências aqui.
print("AVISO: Nenhuma GPU alocada para o VINCIE pelo GPUManager. O serviço VINCIE estará inativo.")
self.workers = []
self._initialized = True
return
devices = [f'cuda:{i}' for i in self.allocated_gpu_indices]
vincie_config_path = VINCIE_DIR / "configs/generate.yaml"
if not vincie_config_path.exists():
raise FileNotFoundError(f"Arquivo de configuração do VINCIE não encontrado em {vincie_config_path}")
self.workers = [VinceWorker(dev_id, str(vincie_config_path)) for dev_id in devices]
print(f"Iniciando carregamento dos modelos em paralelo para {len(self.workers)} GPUs VINCIE...")
threads = [threading.Thread(target=worker.load_model_to_gpu) for worker in self.workers]
for t in threads: t.start()
for t in threads: t.join()
self._initialized = True
print(f"✅ VincePoolManager pronto com {len(self.workers)} workers 'quentes'.")
def _get_next_worker(self) -> VinceWorker:
"""Seleciona o próximo worker disponível usando uma estratégia round-robin."""
if not self.workers:
raise RuntimeError("Não há workers VINCIE disponíveis para processar a tarefa.")
with self.worker_lock:
worker = self.workers[self.next_worker_idx]
self.next_worker_idx = (self.next_worker_idx + 1) % len(self.workers)
print(f"Tarefa despachada para o worker: {worker.device_id_str}")
return worker
def generate_multi_turn(self, input_image: str, turns: List[str], **kwargs) -> Path:
"""Gera um vídeo a partir de uma imagem e uma sequência de prompts (turnos)."""
worker = self._get_next_worker()
out_dir = self.output_root / f"vince_multi_turn_{Path(input_image).stem}_{os.urandom(4).hex()}"
out_dir.mkdir(parents=True)
infer_kwargs = {"output_dir": out_dir, "image_path": input_image, "prompts": turns, **kwargs}
return worker.infer(**infer_kwargs)
def generate_multi_concept(self, concept_images: List[str], concept_prompts: List[str], final_prompt: str, **kwargs) -> Path:
"""Gera um vídeo a partir de múltiplas imagens-conceito e um prompt final."""
worker = self._get_next_worker()
out_dir = self.output_root / f"vince_multi_concept_{os.urandom(4).hex()}"
out_dir.mkdir(parents=True)
all_prompts = concept_prompts + [final_prompt]
infer_kwargs = {"output_dir": out_dir, "image_path": concept_images, "prompts": all_prompts, **kwargs}
return worker.infer(**infer_kwargs)
# --- Instância Singleton Global ---
# A inicialização é envolvida em um try-except para evitar que a aplicação inteira quebre
# se o VINCIE não puder ser inicializado por algum motivo.
try:
output_root_path = os.getenv("OUTPUT_ROOT", "/app/outputs")
vince_pool_manager_singleton = VincePoolManager(output_root=output_root_path)
except Exception as e:
print(f"ERRO CRÍTICO ao inicializar o VincePoolManager: {e}", file=sys.stderr)
traceback.print_exc()
vince_pool_manager_singleton = None