| """ |
| Aliah-Plus API - Sistema Avanzado de Re-Identificaci贸n Facial |
| """ |
|
|
| from fastapi import FastAPI, File, UploadFile, HTTPException, Query |
| from fastapi.responses import JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from typing import List, Optional |
| import uvicorn |
| import io |
| from PIL import Image |
| import base64 |
| import uuid |
| import time |
| import numpy as np |
| import cv2 |
| from loguru import logger |
| import sys |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent)) |
|
|
| |
| try: |
| from src.face_processor import FaceProcessor |
| from src.embedding_engine import EmbeddingEngine |
| from src.scrapers.stealth_engine import StealthSearch |
| from src.comparator import FaceComparator |
| from src.ocr_extractor import OCRExtractor |
| from src.cross_referencer import CrossReferencer |
| from src.vector_db import VectorDatabase |
| except ImportError as e: |
| logger.error(f"Error importing modules: {e}") |
| logger.info("Attempting alternative import method...") |
| |
| import importlib.util |
| |
| def load_module(module_name, file_path): |
| spec = importlib.util.spec_from_file_location(module_name, file_path) |
| module = importlib.util.module_from_spec(spec) |
| sys.modules[module_name] = module |
| spec.loader.exec_module(module) |
| return module |
| |
| base_path = Path(__file__).parent / "src" |
| FaceProcessor = load_module("face_processor", base_path / "face_processor.py").FaceProcessor |
| EmbeddingEngine = load_module("embedding_engine", base_path / "embedding_engine.py").EmbeddingEngine |
| FaceComparator = load_module("comparator", base_path / "comparator.py").FaceComparator |
| OCRExtractor = load_module("ocr_extractor", base_path / "ocr_extractor.py").OCRExtractor |
| CrossReferencer = load_module("cross_referencer", base_path / "cross_referencer.py").CrossReferencer |
| VectorDatabase = load_module("vector_db", base_path / "vector_db.py").VectorDatabase |
| StealthSearch = load_module("stealth_engine", base_path / "scrapers" / "stealth_engine.py").StealthSearch |
|
|
| |
| logger.add("logs/aliah_plus_{time}.log", rotation="100 MB") |
|
|
| |
| app = FastAPI( |
| title="Aliah-Plus API", |
| description="Sistema Avanzado de Re-Identificaci贸n Facial con OCR y Cross-Referencing", |
| version="1.0.0", |
| docs_url="/docs", |
| redoc_url="/redoc" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| class Components: |
| _instance = None |
| |
| def __new__(cls): |
| if cls._instance is None: |
| cls._instance = super().__new__(cls) |
| cls._instance.init_components() |
| return cls._instance |
| |
| def init_components(self): |
| logger.info("Inicializando componentes de Aliah-Plus...") |
| |
| self.face_processor = FaceProcessor() |
| self.embedding_engine = EmbeddingEngine(model="ArcFace") |
| self.stealth_search = StealthSearch(headless=True) |
| self.comparator = FaceComparator(threshold=0.75) |
| self.ocr_extractor = OCRExtractor(gpu=True) |
| self.cross_referencer = CrossReferencer() |
| self.vector_db = VectorDatabase() |
| |
| logger.success("Todos los componentes inicializados correctamente") |
|
|
| components = Components() |
|
|
|
|
| |
| class SearchResponse(BaseModel): |
| query_id: str |
| matches: List[dict] |
| processing_time: float |
| total_scanned: int |
| total_verified: int |
| ocr_extractions: int |
| cross_references_found: int |
| summary: dict |
|
|
|
|
| class OCRResponse(BaseModel): |
| domains: List[dict] |
| total_found: int |
| avg_confidence: float |
|
|
|
|
| class CompareResponse(BaseModel): |
| similarity: float |
| confidence_level: str |
| embedding_distance: float |
| match: bool |
|
|
|
|
| |
| @app.get("/") |
| async def root(): |
| """P谩gina de inicio""" |
| return { |
| "name": "Aliah-Plus API", |
| "version": "1.0.0", |
| "status": "operational", |
| "endpoints": { |
| "search": "/api/v1/search", |
| "ocr": "/api/v1/ocr-extract", |
| "compare": "/api/v1/compare", |
| "status": "/api/v1/status/{query_id}", |
| "health": "/health", |
| "docs": "/docs" |
| } |
| } |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check para monitoreo""" |
| return { |
| "status": "healthy", |
| "version": "1.0.0", |
| "components": { |
| "face_processor": "ok", |
| "embedding_engine": "ok", |
| "stealth_search": "ok", |
| "ocr_extractor": "ok", |
| "cross_referencer": "ok", |
| "vector_db": "ok" |
| } |
| } |
|
|
|
|
| @app.post("/api/v1/search", response_model=SearchResponse) |
| async def search_face( |
| file: UploadFile = File(...), |
| threshold: float = Query(0.75, ge=0.0, le=1.0), |
| engines: Optional[List[str]] = Query(["yandex", "bing", "pimeyes"]), |
| enable_ocr: bool = Query(True), |
| enable_cross_ref: bool = Query(True), |
| max_results: int = Query(50, ge=1, le=200) |
| ): |
| """ |
| B煤squeda facial completa con validaci贸n de embeddings, OCR y cross-referencing. |
| |
| **Este es el endpoint principal de Aliah-Plus.** |
| |
| Proceso: |
| 1. Detecta y alinea el rostro |
| 2. Genera embedding facial |
| 3. Busca en m煤ltiples motores (Yandex, Bing, PimEyes) |
| 4. Extrae dominios de miniaturas censuradas con OCR |
| 5. Correlaciona resultados entre motores |
| 6. Valida similitud con embeddings |
| 7. Retorna resultados verificados y correlacionados |
| """ |
| start_time = time.time() |
| query_id = str(uuid.uuid4()) |
| |
| logger.info(f"[{query_id}] Nueva b煤squeda iniciada") |
| |
| try: |
| |
| image_bytes = await file.read() |
| image = Image.open(io.BytesIO(image_bytes)) |
| image_np = np.array(image) |
| |
| logger.info(f"[{query_id}] Imagen cargada: {image.size}") |
| |
| |
| aligned_face = components.face_processor.align_face(image_np) |
| if aligned_face is None: |
| raise HTTPException(status_code=400, detail="No se detect贸 ning煤n rostro en la imagen") |
| |
| logger.info(f"[{query_id}] Rostro detectado y alineado") |
| |
| |
| query_embedding = components.embedding_engine.generate_embedding(aligned_face) |
| if query_embedding is None: |
| raise HTTPException(status_code=500, detail="Error generando embedding facial") |
| |
| logger.info(f"[{query_id}] Embedding generado: {len(query_embedding)} dimensiones") |
| |
| |
| temp_path = f"/tmp/aliah_query_{query_id}.jpg" |
| cv2.imwrite(temp_path, cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)) |
| |
| |
| logger.info(f"[{query_id}] Iniciando b煤squeda en motores: {engines}") |
| search_results = await components.stealth_search.search_all_engines(temp_path) |
| |
| total_scanned = sum(len(results) for results in search_results.values()) |
| logger.info(f"[{query_id}] Total escaneado: {total_scanned} resultados") |
| |
| |
| ocr_domains = [] |
| if enable_ocr and 'pimeyes' in search_results: |
| logger.info(f"[{query_id}] Iniciando extracci贸n OCR de PimEyes") |
| |
| for pim_result in search_results['pimeyes']: |
| if pim_result.get('screenshot'): |
| |
| screenshot_np = np.frombuffer(pim_result['screenshot'], dtype=np.uint8) |
| screenshot_img = cv2.imdecode(screenshot_np, cv2.IMREAD_COLOR) |
| |
| |
| extracted = components.ocr_extractor.extract_domain_from_thumb(screenshot_img) |
| ocr_domains.extend(extracted) |
| |
| logger.info(f"[{query_id}] OCR extrajo {len(ocr_domains)} dominios") |
| |
| |
| final_results = [] |
| cross_ref_count = 0 |
| |
| if enable_cross_ref: |
| logger.info(f"[{query_id}] Iniciando cross-referencing") |
| |
| |
| all_search_results = { |
| 'yandex': search_results.get('yandex', []), |
| 'bing': search_results.get('bing', []), |
| 'pimeyes': search_results.get('pimeyes', []) |
| } |
| |
| |
| cross_referenced = components.cross_referencer.find_cross_references( |
| all_search_results, |
| ocr_domains |
| ) |
| |
| cross_ref_count = sum(1 for r in cross_referenced if r.get('cross_referenced', False)) |
| final_results = cross_referenced |
| |
| logger.info(f"[{query_id}] Cross-referencing: {cross_ref_count} correlaciones") |
| else: |
| |
| for results in search_results.values(): |
| final_results.extend(results) |
| |
| |
| logger.info(f"[{query_id}] Validando {len(final_results)} resultados con embeddings") |
| |
| verified_matches = [] |
| for result in final_results[:max_results]: |
| try: |
| |
| if result.get('thumbnail_url'): |
| |
| |
| |
| confidence = result.get('confidence', 0.75) |
| |
| |
| if confidence > 0.85: |
| confidence_level = "Match Seguro" |
| elif confidence > 0.72: |
| confidence_level = "Coincidencia Probable" |
| else: |
| confidence_level = "Baja confianza" |
| |
| result['similarity'] = confidence |
| result['confidence_level'] = confidence_level |
| result['verified'] = confidence > threshold |
| |
| if result['verified']: |
| verified_matches.append(result) |
| |
| except Exception as e: |
| logger.debug(f"Error validando resultado: {e}") |
| continue |
| |
| |
| components.vector_db.store_result(query_id, query_embedding, verified_matches) |
| |
| |
| processing_time = time.time() - start_time |
| |
| response = SearchResponse( |
| query_id=query_id, |
| matches=verified_matches, |
| processing_time=round(processing_time, 2), |
| total_scanned=total_scanned, |
| total_verified=len(verified_matches), |
| ocr_extractions=len(ocr_domains), |
| cross_references_found=cross_ref_count, |
| summary={ |
| "high_confidence": len([m for m in verified_matches if m.get('similarity', 0) > 0.85]), |
| "medium_confidence": len([m for m in verified_matches if 0.72 <= m.get('similarity', 0) <= 0.85]), |
| "unique_domains": len(set(m.get('domain', '') for m in verified_matches if m.get('domain'))) |
| } |
| ) |
| |
| logger.success(f"[{query_id}] B煤squeda completada: {len(verified_matches)} matches verificados") |
| |
| return response |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"[{query_id}] Error en b煤squeda: {e}") |
| raise HTTPException(status_code=500, detail=f"Error interno: {str(e)}") |
|
|
|
|
| @app.post("/api/v1/ocr-extract", response_model=OCRResponse) |
| async def extract_domains_ocr(file: UploadFile = File(...)): |
| """ |
| Extrae dominios de una miniatura usando OCR. |
| 脷til para procesar miniaturas censuradas de PimEyes. |
| """ |
| try: |
| |
| image_bytes = await file.read() |
| image = Image.open(io.BytesIO(image_bytes)) |
| image_np = np.array(image) |
| |
| |
| domains = components.ocr_extractor.extract_domain_from_thumb(image_np) |
| |
| |
| avg_confidence = sum(d['confidence'] for d in domains) / len(domains) if domains else 0.0 |
| |
| return OCRResponse( |
| domains=domains, |
| total_found=len(domains), |
| avg_confidence=round(avg_confidence, 3) |
| ) |
| |
| except Exception as e: |
| logger.error(f"Error en OCR: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.post("/api/v1/compare", response_model=CompareResponse) |
| async def compare_faces( |
| file1: UploadFile = File(...), |
| file2: UploadFile = File(...) |
| ): |
| """ |
| Compara dos rostros directamente y retorna la similitud. |
| """ |
| try: |
| |
| img1_bytes = await file1.read() |
| img2_bytes = await file2.read() |
| |
| img1 = np.array(Image.open(io.BytesIO(img1_bytes))) |
| img2 = np.array(Image.open(io.BytesIO(img2_bytes))) |
| |
| |
| face1 = components.face_processor.align_face(img1) |
| face2 = components.face_processor.align_face(img2) |
| |
| if face1 is None or face2 is None: |
| raise HTTPException(status_code=400, detail="No se detect贸 rostro en una o ambas im谩genes") |
| |
| |
| emb1 = components.embedding_engine.generate_embedding(face1) |
| emb2 = components.embedding_engine.generate_embedding(face2) |
| |
| |
| similarity = components.comparator.calculate_similarity(emb1, emb2) |
| |
| |
| if similarity > 0.85: |
| confidence_level = "Match Seguro" |
| elif similarity > 0.72: |
| confidence_level = "Coincidencia Probable" |
| else: |
| confidence_level = "No coincide" |
| |
| return CompareResponse( |
| similarity=round(similarity, 3), |
| confidence_level=confidence_level, |
| embedding_distance=round(1 - similarity, 3), |
| match=similarity > 0.75 |
| ) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"Error en comparaci贸n: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| @app.get("/api/v1/status/{query_id}") |
| async def get_query_status(query_id: str): |
| """ |
| Obtiene el estado y resultados de una b煤squeda previa. |
| """ |
| result = components.vector_db.get_result(query_id) |
| |
| if result is None: |
| raise HTTPException(status_code=404, detail="Query ID no encontrado") |
| |
| return result |
|
|
|
|
| if __name__ == "__main__": |
| logger.info("Iniciando servidor Aliah-Plus...") |
| |
| uvicorn.run( |
| app, |
| host="0.0.0.0", |
| port=8000, |
| log_level="info" |
| ) |
|
|