| """Space 1: Extract Frames + Caption (Florence-2) |
| |
| Uploads videos -> extracts frames with face detection -> captions with Florence-2 -> saves to Hub. |
| GPU: T4 medium (~4GB VRAM for Florence-2) |
| """ |
| import gc |
| import json |
| import logging |
| import os |
| import shutil |
| import subprocess |
| import traceback |
| from pathlib import Path |
|
|
| import cv2 |
| import gradio as gr |
| import numpy as np |
| import torch |
| from PIL import Image |
|
|
| from hub_utils import upload_step, list_projects |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| |
| IS_HF_SPACE = os.environ.get("SPACE_ID") is not None |
| _data_path = Path("/data") |
| if IS_HF_SPACE and _data_path.exists() and os.access(_data_path, os.W_OK): |
| BASE_DIR = _data_path |
| else: |
| BASE_DIR = Path("data") |
|
|
| FRAMES_DIR = BASE_DIR / "frames" |
| TEMP_DIR = BASE_DIR / "temp" |
| HF_CACHE_DIR = BASE_DIR / "hf_cache" |
|
|
| for d in [FRAMES_DIR, TEMP_DIR, HF_CACHE_DIR]: |
| d.mkdir(parents=True, exist_ok=True) |
|
|
| os.environ["HF_HOME"] = str(HF_CACHE_DIR) |
| os.environ["TRANSFORMERS_CACHE"] = str(HF_CACHE_DIR) |
|
|
| FLORENCE2_MODEL_ID = "microsoft/Florence-2-large" |
| FRAME_EXTRACT_FPS = 1 |
| MIN_SHARPNESS = 50.0 |
| TARGET_NUM_FRAMES = 100 |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| APP_VERSION = "1.0.0" |
|
|
| |
|
|
| def _ffmpeg_extract_frames(video_path: str, output_dir: str, fps: float = 1.0): |
| Path(output_dir).mkdir(parents=True, exist_ok=True) |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, |
| "-vf", f"fps={fps}", |
| "-qmin", "1", "-q:v", "2", |
| f"{output_dir}/frame_%06d.jpg", |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"FFmpeg failed: {result.stderr[-500:]}") |
|
|
|
|
| |
|
|
| _face_net = None |
|
|
| def _get_face_detector(): |
| global _face_net |
| if _face_net is not None: |
| return _face_net |
| cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml" |
| _face_net = cv2.CascadeClassifier(cascade_path) |
| return _face_net |
|
|
|
|
| def _compute_sharpness(gray): |
| return cv2.Laplacian(gray, cv2.CV_64F).var() |
|
|
|
|
| def _detect_faces(image_bgr): |
| detector = _get_face_detector() |
| h, w = image_bgr.shape[:2] |
| gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY) |
| rects = detector.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(60, 60)) |
| faces = [] |
| for (x, y, fw, fh) in rects: |
| faces.append({"confidence": 0.9, "x": x/w, "y": y/h, "w": fw/w, "h": fh/h}) |
| return faces |
|
|
|
|
| def _score_frame(image_path): |
| img = cv2.imread(image_path) |
| if img is None: |
| return None |
| h, w = img.shape[:2] |
| faces = _detect_faces(img) |
| if not faces: |
| return None |
| best_face = max(faces, key=lambda f: f["w"] * f["h"]) |
| fx, fy = max(0, int(best_face["x"]*w)), max(0, int(best_face["y"]*h)) |
| fw, fh = int(best_face["w"]*w), int(best_face["h"]*h) |
| face_crop = img[fy:fy+fh, fx:fx+fw] |
| if face_crop.size == 0: |
| return None |
| gray_face = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY) |
| sharpness = _compute_sharpness(gray_face) |
| if sharpness < MIN_SHARPNESS: |
| return None |
| face_area_ratio = best_face["w"] * best_face["h"] |
| center_x = best_face["x"] + best_face["w"] / 2 |
| center_y = best_face["y"] + best_face["h"] / 2 |
| center_score = 1.0 - (abs(center_x - 0.5) + abs(center_y - 0.45)) |
| total_score = ( |
| sharpness / 500.0 * 0.4 + |
| best_face["confidence"] * 0.3 + |
| face_area_ratio * 10 * 0.15 + |
| max(0, center_score) * 0.15 |
| ) |
| return {"path": image_path, "sharpness": sharpness, "score": total_score} |
|
|
|
|
| def _select_diverse(scored, target): |
| if len(scored) <= target: |
| return scored |
| candidates = scored[:target * 3] |
| candidates.sort(key=lambda x: x["path"]) |
| step = max(1, len(candidates) // target) |
| selected = candidates[::step][:target] |
| if len(selected) < target: |
| used = {s["path"] for s in selected} |
| for item in scored: |
| if item["path"] not in used: |
| selected.append(item) |
| if len(selected) >= target: |
| break |
| return selected |
|
|
|
|
| def extract_and_select_frames(video_paths, num_frames, fps, progress_callback=None): |
| temp_frames_dir = TEMP_DIR / "raw_frames" |
| if temp_frames_dir.exists(): |
| shutil.rmtree(temp_frames_dir) |
| temp_frames_dir.mkdir(parents=True) |
|
|
| all_frame_paths = [] |
| for i, vpath in enumerate(video_paths): |
| if progress_callback: |
| progress_callback(i / len(video_paths) * 0.3, f"Extrayendo frames del video {i+1}/{len(video_paths)}...") |
| out_dir = str(temp_frames_dir / f"video_{i}") |
| _ffmpeg_extract_frames(vpath, out_dir, fps) |
| frames = sorted(Path(out_dir).glob("*.jpg")) |
| all_frame_paths.extend([str(f) for f in frames]) |
|
|
| logger.info(f"Extracted {len(all_frame_paths)} raw frames") |
|
|
| scored = [] |
| for i, fpath in enumerate(all_frame_paths): |
| if progress_callback and i % 50 == 0: |
| progress_callback(0.3 + (i / len(all_frame_paths)) * 0.5, f"Puntuando frame {i+1}/{len(all_frame_paths)}...") |
| result = _score_frame(fpath) |
| if result: |
| scored.append(result) |
|
|
| if not scored: |
| raise ValueError("No se encontraron frames validos con caras. Revisa la calidad del video.") |
|
|
| scored.sort(key=lambda x: x["score"], reverse=True) |
| selected = _select_diverse(scored, num_frames) |
|
|
| output_dir = FRAMES_DIR |
| if output_dir.exists(): |
| shutil.rmtree(output_dir) |
| output_dir.mkdir(parents=True) |
|
|
| output_paths = [] |
| for i, item in enumerate(selected): |
| dst = output_dir / f"frame_{i:04d}.jpg" |
| shutil.copy2(item["path"], dst) |
| output_paths.append(str(dst)) |
|
|
| shutil.rmtree(temp_frames_dir, ignore_errors=True) |
| logger.info(f"Selected {len(output_paths)} diverse, high-quality frames") |
| return output_paths |
|
|
|
|
| |
|
|
| _florence_model = None |
| _florence_processor = None |
|
|
|
|
| def _load_florence2(): |
| global _florence_model, _florence_processor |
| if _florence_model is not None: |
| return |
|
|
| from transformers import AutoModelForCausalLM, AutoProcessor |
|
|
| logger.info(f"Loading Florence-2 from {FLORENCE2_MODEL_ID}...") |
| _florence_model = AutoModelForCausalLM.from_pretrained( |
| FLORENCE2_MODEL_ID, |
| torch_dtype=torch.float16, |
| trust_remote_code=True, |
| attn_implementation="eager", |
| ).to(DEVICE) |
| _florence_processor = AutoProcessor.from_pretrained( |
| FLORENCE2_MODEL_ID, trust_remote_code=True, |
| ) |
| |
| _orig = _florence_model.language_model.prepare_inputs_for_generation |
| def _patched(input_ids, past_key_values=None, **kwargs): |
| try: |
| return _orig(input_ids, past_key_values=past_key_values, **kwargs) |
| except (AttributeError, TypeError): |
| model_inputs = {"input_ids": input_ids} |
| if "attention_mask" in kwargs: |
| model_inputs["attention_mask"] = kwargs["attention_mask"] |
| return model_inputs |
| _florence_model.language_model.prepare_inputs_for_generation = _patched |
| logger.info("Florence-2 loaded") |
|
|
|
|
| def _unload_florence2(): |
| global _florence_model, _florence_processor |
| if _florence_model is not None: |
| _florence_model.to("cpu") |
| del _florence_model |
| _florence_model = None |
| _florence_processor = None |
| gc.collect() |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
|
|
|
|
| def caption_single(image_path): |
| _load_florence2() |
| image = Image.open(image_path).convert("RGB") |
| prompt = "<MORE_DETAILED_CAPTION>" |
| inputs = _florence_processor(text=prompt, images=image, return_tensors="pt").to(DEVICE, torch.float16) |
| with torch.inference_mode(): |
| generated_ids = _florence_model.generate(**inputs, max_new_tokens=150, num_beams=1, do_sample=False) |
| text = _florence_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] |
| caption = text.strip() |
| return caption if caption else "a photo of a person" |
|
|
|
|
| def caption_dataset(image_paths, progress_callback=None): |
| if not image_paths: |
| raise ValueError("No hay imagenes para captar") |
| _load_florence2() |
| captions = {} |
| for i, img_path in enumerate(image_paths): |
| if progress_callback: |
| progress_callback(i / len(image_paths), f"Captioning {i+1}/{len(image_paths)}...") |
| captions[img_path] = caption_single(img_path) |
| logger.info(f"[{i+1}/{len(image_paths)}] {Path(img_path).name}: {captions[img_path][:80]}...") |
|
|
| captions_file = FRAMES_DIR / "captions.json" |
| portable = {Path(k).name: v for k, v in captions.items()} |
| with open(captions_file, "w") as f: |
| json.dump(portable, f, indent=2, ensure_ascii=False) |
|
|
| for img_path, caption in captions.items(): |
| Path(img_path).with_suffix(".txt").write_text(caption) |
|
|
| _unload_florence2() |
| return captions |
|
|
|
|
| |
|
|
| def process_videos(project_name, videos, num_frames, progress=gr.Progress()): |
| if not project_name or not project_name.strip(): |
| return None, "Error: Debes introducir un nombre de proyecto" |
| if not videos: |
| return None, "Error: No se han subido videos" |
|
|
| video_paths = [v.name if hasattr(v, "name") else v for v in videos] |
| logger.info(f"=== Frame Extraction Started === Videos: {len(video_paths)}, Target: {num_frames}") |
|
|
| try: |
| progress(0.0, desc="Extrayendo frames...") |
| frame_paths = extract_and_select_frames( |
| video_paths, num_frames=int(num_frames), fps=FRAME_EXTRACT_FPS, |
| progress_callback=lambda p, m: progress(p * 0.5, desc=m), |
| ) |
|
|
| progress(0.5, desc="Captioning con Florence-2...") |
| captions = caption_dataset( |
| frame_paths, |
| progress_callback=lambda p, m: progress(0.5 + p * 0.5, desc=m), |
| ) |
|
|
| gallery = [(p, Path(p).stem) for p in frame_paths] |
| status = f"OK - {len(frame_paths)} frames extraidos, {len(captions)} captions generados" |
| logger.info(f"=== Frame Extraction Complete === {status}") |
| return gallery, status |
|
|
| except Exception as e: |
| logger.error(f"=== Frame Extraction Failed ===\n{traceback.format_exc()}") |
| return None, f"Error: {e}" |
|
|
|
|
| def save_to_hub(project_name): |
| if not project_name or not project_name.strip(): |
| return "Error: Debes introducir un nombre de proyecto" |
| name = project_name.strip() |
| frames = list(FRAMES_DIR.glob("*.jpg")) |
| if not frames: |
| return "Error: No hay frames para guardar. Procesa videos primero." |
| try: |
| return upload_step(name, "step1_frames", str(FRAMES_DIR)) |
| except Exception as e: |
| return f"Error: {e}" |
|
|
|
|
| def delete_selected_frame(gallery, selected_index): |
| if gallery is None or selected_index is None: |
| return gallery, "Selecciona una imagen para eliminar" |
| if selected_index < 0 or selected_index >= len(gallery): |
| return gallery, "Indice fuera de rango" |
|
|
| item = gallery[selected_index] |
| img_path = Path(item[0] if isinstance(item, (list, tuple)) else item) |
|
|
| deleted = False |
| for frame_file in FRAMES_DIR.glob("*.jpg"): |
| if frame_file.name == img_path.name or str(frame_file) == str(img_path): |
| frame_file.unlink(missing_ok=True) |
| frame_file.with_suffix(".txt").unlink(missing_ok=True) |
| deleted = True |
| break |
|
|
| if not deleted: |
| return gallery, "No se encontro el archivo para eliminar" |
|
|
| captions_file = FRAMES_DIR / "captions.json" |
| if captions_file.exists(): |
| with open(captions_file) as f: |
| captions = json.load(f) |
| captions.pop(img_path.name, None) |
| with open(captions_file, "w") as f: |
| json.dump(captions, f, indent=2, ensure_ascii=False) |
|
|
| remaining = sorted(FRAMES_DIR.glob("*.jpg")) |
| new_gallery = [(str(p), p.stem) for p in remaining] |
| return new_gallery, f"Eliminado. Quedan {len(remaining)} frames" |
|
|
|
|
| |
|
|
| with gr.Blocks(title="Talking Head - Frames", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(f"# Talking Head - Extraer Frames `v{APP_VERSION}`\nExtrae frames con deteccion facial y genera captions con Florence-2") |
|
|
| project_name = gr.Textbox( |
| label="Nombre del proyecto", |
| placeholder="mi_proyecto", |
| info="Obligatorio. Se usa como carpeta en el Hub.", |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| video_input = gr.File( |
| label="Videos (MP4/MOV/AVI/MKV)", file_count="multiple", |
| file_types=[".mp4", ".mov", ".avi", ".mkv"], |
| ) |
| num_frames = gr.Slider(20, 200, value=TARGET_NUM_FRAMES, step=10, label="Numero de frames a extraer") |
| process_btn = gr.Button("Procesar Videos", variant="primary") |
| with gr.Column(): |
| frame_gallery = gr.Gallery(label="Frames extraidos", columns=5, height=500, object_fit="contain") |
| with gr.Row(): |
| selected_idx = gr.Number(value=0, label="Indice seleccionado", precision=0) |
| delete_btn = gr.Button("Eliminar frame", variant="stop", size="sm") |
| status_box = gr.Textbox(label="Estado", interactive=False) |
|
|
| save_btn = gr.Button("Guardar en Hub", variant="secondary") |
| save_status = gr.Textbox(label="Estado guardado", interactive=False) |
|
|
| def on_gallery_select(evt: gr.SelectData): |
| return evt.index |
|
|
| frame_gallery.select(fn=on_gallery_select, inputs=None, outputs=[selected_idx]) |
|
|
| process_btn.click( |
| process_videos, |
| inputs=[project_name, video_input, num_frames], |
| outputs=[frame_gallery, status_box], |
| ) |
| delete_btn.click( |
| delete_selected_frame, |
| inputs=[frame_gallery, selected_idx], |
| outputs=[frame_gallery, status_box], |
| ) |
| save_btn.click(save_to_hub, inputs=[project_name], outputs=[save_status]) |
|
|
| if __name__ == "__main__": |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) |
|
|