Spaces:
Runtime error
Runtime error
| import os | |
| import cv2 | |
| import faiss | |
| import pickle | |
| import numpy as np | |
| import pandas as pd | |
| from pathlib import Path | |
| import insightface | |
| import albumentations as A | |
| # π§ μ¦κ° μ€μ | |
| augment = A.Compose([ | |
| A.HorizontalFlip(p=0.5), | |
| A.RandomBrightnessContrast(p=0.3), | |
| A.Rotate(limit=15, p=0.3), | |
| ]) | |
| # π λͺ¨λΈ μ΄κΈ°ν ν¨μ | |
| def load_face_model(device: str = "cpu"): | |
| providers = ["CPUExecutionProvider"] if device == "cpu" else ["CUDAExecutionProvider"] | |
| model = insightface.app.FaceAnalysis(name='buffalo_l', providers=providers) | |
| model.prepare(ctx_id=0 if device != "cpu" else -1) | |
| return model | |
| # π μλ² λ© μΆμΆ ν¨μ | |
| def get_face_embedding(image_path: str, model, n_augment: int = 5): | |
| img = cv2.imread(str(image_path)) | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| embeddings = [] | |
| # μλ³Έ | |
| faces = model.get(img) | |
| if faces: | |
| embeddings.append(faces[0].embedding) | |
| else: | |
| print(f"β μΌκ΅΄ μΈμ μ€ν¨ (μλ³Έ): {image_path}") | |
| # μ¦κ° | |
| for i in range(n_augment): | |
| augmented = augment(image=img) | |
| img_aug = augmented['image'] | |
| faces = model.get(img_aug) | |
| if faces: | |
| embeddings.append(faces[0].embedding) | |
| else: | |
| print(f"β μΌκ΅΄ μΈμ μ€ν¨ (μ¦κ° {i+1}): {image_path}") | |
| if embeddings: | |
| return np.mean(embeddings, axis=0) | |
| else: | |
| print(f"β λͺ¨λ μλ μ€ν¨: {image_path}") | |
| return None | |
| # π ν΄λ μ€μΊ λ° μλ² λ© μΆμΆ | |
| def process_folder(data_folder: str, model) -> pd.DataFrame: | |
| data = [] | |
| data_path = Path(data_folder) | |
| for person_dir in data_path.iterdir(): | |
| if not person_dir.is_dir(): | |
| continue | |
| label = person_dir.name | |
| print(f"βΆ ν΄λ: {label}") | |
| count = 0 | |
| for image_path in person_dir.glob("*"): | |
| if image_path.suffix.lower() not in [".jpg", ".jpeg", ".png"]: | |
| continue | |
| emb = get_face_embedding(image_path, model) | |
| if emb is not None: | |
| data.append({ | |
| "label": label, | |
| "image_path": str(image_path), | |
| "embedding": emb | |
| }) | |
| count += 1 | |
| print(f"β μΌκ΅΄ μΈμ μ±κ³΅ μ: {count}") | |
| return pd.DataFrame(data) | |
| # π FAISS μΈλ±μ€ μμ± λ° μ μ₯ | |
| def build_and_save_faiss(train_df: pd.DataFrame, save_path: str): | |
| embeddings = np.stack(train_df['embedding'].values).astype('float32') | |
| embeddings /= np.linalg.norm(embeddings, axis=1, keepdims=True) | |
| index = faiss.IndexFlatIP(embeddings.shape[1]) | |
| index.add(embeddings) | |
| faiss.write_index(index, os.path.join(save_path, "faiss_index.index")) | |
| labels = train_df['label'].tolist() | |
| with open(os.path.join(save_path, "faiss_labels.pkl"), "wb") as f: | |
| pickle.dump(labels, f) | |
| # μ 체 λ°μ΄ν°νλ μ μ μ₯ (μ ν) | |
| train_df.to_pickle(os.path.join(save_path, "train_df.pkl")) | |
| print("β FAISS μΈλ±μ€ & λΌλ²¨ μ μ₯ μλ£") | |
| return index, labels, train_df | |
| # π μ 체 μ€ν ν¨μ | |
| def run_pipeline(data_folder: str, save_path: str, device: str = "cpu"): | |
| os.makedirs(save_path, exist_ok=True) | |
| print("π μΌκ΅΄ λͺ¨λΈ λΆλ¬μ€λ μ€...") | |
| model = load_face_model(device) | |
| print("π μλ² λ© μΆμΆ μμ...") | |
| train_df = process_folder(data_folder, model) | |
| print("π FAISS μΈλ±μ€ μμ± λ° μ μ₯ μ€...") | |
| index, labels, df = build_and_save_faiss(train_df, save_path) | |
| return index, labels, df | |
| data_folder = "./person" | |
| save_path = "./embedding/person" | |
| index, labels, df = run_pipeline(data_folder, save_path, device="cpu") |