Segizu's picture
performance
9c5866b
raw
history blame
9.23 kB
import numpy as np
from PIL import Image
import gradio as gr
from deepface import DeepFace
from datasets import load_dataset
import os
import pickle
from io import BytesIO
from huggingface_hub import upload_file, hf_hub_download, list_repo_files
from pathlib import Path
import gc
import requests
import time
import shutil
import tarfile
# 🔁 Limpiar almacenamiento temporal si existe
def clean_temp_dirs():
print("🧹 Limpiando carpetas temporales...")
for folder in ["embeddings", "batches"]:
path = Path(folder)
if path.exists() and path.is_dir():
shutil.rmtree(path)
print(f"✅ Carpeta eliminada: {folder}")
path.mkdir(exist_ok=True)
clean_temp_dirs()
# 📁 Parámetros
DATASET_ID = "Segizu/facial-recognition"
EMBEDDINGS_SUBFOLDER = "embeddings"
LOCAL_EMB_DIR = Path("embeddings")
LOCAL_EMB_DIR.mkdir(exist_ok=True)
HF_TOKEN = os.getenv("HF_TOKEN")
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# 💾 Configuración de control de almacenamiento
MAX_TEMP_STORAGE_GB = 40
UPLOAD_EVERY = 50
embeddings_to_upload = []
def get_folder_size(path):
total = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
total += os.path.getsize(fp)
return total / (1024 ** 3) # En GB
def flush_embeddings():
global embeddings_to_upload
print("🚀 Subiendo lote de embeddings a Hugging Face...")
for emb_file in embeddings_to_upload:
try:
filename = emb_file.name
upload_file(
path_or_fileobj=str(emb_file),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{filename}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
os.remove(emb_file)
print(f"✅ Subido y eliminado: {filename}")
time.sleep(1.2) # Evita 429
except Exception as e:
print(f"❌ Error subiendo {filename}: {e}")
continue
embeddings_to_upload = []
# ✅ Cargar CSV desde el dataset
dataset = load_dataset(
"csv",
data_files="metadata.csv",
split="train",
column_names=["image"],
header=0
)
print("✅ Validación post-carga")
print(dataset[0])
print("Columnas:", dataset.column_names)
# 🔄 Preprocesamiento
def preprocess_image(img: Image.Image) -> np.ndarray:
img_rgb = img.convert("RGB")
img_resized = img_rgb.resize((160, 160), Image.Resampling.LANCZOS)
return np.array(img_resized)
def build_database():
print(f"📊 Uso actual de almacenamiento tempora _ INICIO_: {get_folder_size('.'):.2f} GB")
print("🔄 Generando embeddings...")
batch_size = 10
archive_batch_size = 50
batch_files = []
batch_index = 0
ARCHIVE_DIR = Path("batches")
ARCHIVE_DIR.mkdir(exist_ok=True)
for i in range(0, len(dataset), batch_size):
batch = dataset[i:i + batch_size]
print(f"📦 Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}")
for j in range(len(batch["image"])):
item = {"image": batch["image"][j]}
image_url = item["image"]
if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image":
print(f"⚠️ Saltando {i + j} - URL inválida: {image_url}")
continue
name = f"image_{i + j}"
filename = LOCAL_EMB_DIR / f"{name}.pkl"
# Verificar si ya existe en Hugging Face Hub
try:
hf_hub_download(
repo_id=DATASET_ID,
repo_type="dataset",
filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz",
token=HF_TOKEN
)
print(f"⏩ Ya existe en remoto: {name}.pkl")
continue
except:
pass
try:
response = requests.get(image_url, headers=headers, timeout=10)
response.raise_for_status()
img = Image.open(BytesIO(response.content)).convert("RGB")
img_processed = preprocess_image(img)
embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=False
)[0]["embedding"]
with open(filename, "wb") as f:
pickle.dump({"name": name, "img": img, "embedding": embedding}, f)
batch_files.append(filename)
del img_processed
gc.collect()
# Si llegamos al tamaño de archivo por lote o espacio es crítico
if len(batch_files) >= archive_batch_size or get_folder_size(".") > 40:
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for file in batch_files:
tar.add(file, arcname=file.name)
print(f"📦 Empaquetado: {archive_path}")
# Subida al Hub
upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
print(f"✅ Subido: {archive_path.name}")
# Borrar .pkl y el .tar.gz local
for f in batch_files:
f.unlink()
archive_path.unlink()
print("🧹 Limpieza completada tras subida")
batch_files = []
batch_index += 1
time.sleep(2) # Pausa para evitar 429
print(f"📊 Uso actual de almacenamiento tempora _ FINAL_: {get_folder_size('.'):.2f} GB")
except Exception as e:
print(f"❌ Error en {name}: {e}")
continue
# Último lote si queda algo
if batch_files:
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for file in batch_files:
tar.add(file, arcname=file.name)
print(f"📦 Empaquetado final: {archive_path}")
upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
for f in batch_files:
f.unlink()
archive_path.unlink()
print("✅ Subida y limpieza final")
# 🔍 Buscar similitudes desde archivos remotos
def find_similar_faces(uploaded_image: Image.Image):
try:
img_processed = preprocess_image(uploaded_image)
query_embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=False
)[0]["embedding"]
del img_processed
gc.collect()
except Exception as e:
return [], f"⚠ Error procesando imagen: {str(e)}"
similarities = []
try:
embedding_files = [
f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN)
if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".pkl")
]
except Exception as e:
return [], f"⚠ Error obteniendo archivos: {str(e)}"
for file_path in embedding_files:
try:
file_bytes = requests.get(
f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}",
headers=headers,
timeout=10
).content
record = pickle.loads(file_bytes)
name = record["name"]
img = record["img"]
emb = record["embedding"]
dist = np.linalg.norm(np.array(query_embedding) - np.array(emb))
sim_score = 1 / (1 + dist)
similarities.append((sim_score, name, np.array(img)))
except Exception as e:
print(f"⚠ Error con {file_path}: {e}")
continue
similarities.sort(reverse=True)
top = similarities[:5]
gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top]
summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top])
return gallery, summary
# 🚀 Inicializar
print("🚀 Iniciando app...")
build_database()
# 🎛️ Interfaz Gradio
demo = gr.Interface(
fn=find_similar_faces,
inputs=gr.Image(label="📤 Sube una imagen", type="pil"),
outputs=[
gr.Gallery(label="📸 Rostros similares"),
gr.Textbox(label="🧠 Detalle", lines=6)
],
title="🔍 Reconocimiento facial con DeepFace",
description="Sube una imagen y encuentra coincidencias en el dataset privado de Hugging Face usando embeddings Facenet."
)
demo.launch()