|
|
""" |
|
|
OCR Extractor - Módulo Detective para extraer URLs ocultas de miniaturas |
|
|
Este módulo rompe el bloqueo de sitios que censuran URLs con blur. |
|
|
""" |
|
|
|
|
|
import easyocr |
|
|
import numpy as np |
|
|
import cv2 |
|
|
import re |
|
|
from typing import List, Dict, Optional |
|
|
from loguru import logger |
|
|
|
|
|
|
|
|
class OCRExtractor: |
|
|
""" |
|
|
Extrae dominios y URLs de imágenes, incluso si están borrosas o parcialmente ocultas. |
|
|
Implementa técnicas de pre-procesamiento para mejorar la detección en miniaturas de baja calidad. |
|
|
""" |
|
|
|
|
|
|
|
|
TLD_PATTERNS = [ |
|
|
r'\.com', r'\.net', r'\.org', r'\.io', r'\.co', |
|
|
r'\.tv', r'\.me', r'\.site', r'\.app', r'\.dev', |
|
|
r'\.xxx', r'\.adult', r'\.porn', r'\.sex', |
|
|
r'\.fan', r'\.fans', r'\.cam', r'\.live' |
|
|
] |
|
|
|
|
|
|
|
|
URL_PATTERNS = [ |
|
|
r'https?://[^\s]+', |
|
|
r'www\.[a-zA-Z0-9-]+\.[a-zA-Z]{2,}', |
|
|
r'[a-zA-Z0-9-]+\.(?:com|net|org|io|xxx|adult|porn|cam)', |
|
|
] |
|
|
|
|
|
|
|
|
KNOWN_PLATFORMS = [ |
|
|
'onlyfans', 'fansly', 'patreon', 'instagram', 'twitter', |
|
|
'tiktok', 'reddit', 'imgur', 'flickr', 'tumblr', |
|
|
'xvideos', 'pornhub', 'xnxx', 'redtube', 'youporn', |
|
|
'chaturbate', 'myfreecams', 'streamate', 'bongacams' |
|
|
] |
|
|
|
|
|
def __init__(self, gpu: bool = True, languages: List[str] = None): |
|
|
""" |
|
|
Inicializa el OCR engine. |
|
|
|
|
|
Args: |
|
|
gpu: Usar GPU si está disponible |
|
|
languages: Lista de idiomas (default: ['en']) |
|
|
""" |
|
|
if languages is None: |
|
|
languages = ['en'] |
|
|
|
|
|
logger.info(f"Inicializando EasyOCR con GPU={gpu}, idiomas={languages}") |
|
|
|
|
|
try: |
|
|
self.reader = easyocr.Reader(languages, gpu=gpu) |
|
|
logger.success("EasyOCR inicializado correctamente") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error al inicializar con GPU, usando CPU: {e}") |
|
|
self.reader = easyocr.Reader(languages, gpu=False) |
|
|
|
|
|
def preprocess_image(self, image_np: np.ndarray) -> List[np.ndarray]: |
|
|
""" |
|
|
Pre-procesa la imagen con múltiples técnicas para mejorar la detección de texto. |
|
|
Retorna múltiples versiones de la imagen procesada. |
|
|
|
|
|
Args: |
|
|
image_np: Imagen en formato numpy array (BGR) |
|
|
|
|
|
Returns: |
|
|
Lista de imágenes procesadas |
|
|
""" |
|
|
processed_images = [] |
|
|
|
|
|
|
|
|
if len(image_np.shape) == 3: |
|
|
gray = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY) |
|
|
else: |
|
|
gray = image_np.copy() |
|
|
|
|
|
|
|
|
processed_images.append(gray) |
|
|
|
|
|
|
|
|
_, thresh1 = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY) |
|
|
processed_images.append(thresh1) |
|
|
|
|
|
|
|
|
_, thresh2 = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV) |
|
|
processed_images.append(thresh2) |
|
|
|
|
|
|
|
|
adaptive = cv2.adaptiveThreshold( |
|
|
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
|
|
cv2.THRESH_BINARY, 11, 2 |
|
|
) |
|
|
processed_images.append(adaptive) |
|
|
|
|
|
|
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) |
|
|
enhanced = clahe.apply(gray) |
|
|
processed_images.append(enhanced) |
|
|
|
|
|
|
|
|
denoised = cv2.fastNlMeansDenoising(gray, None, 10, 7, 21) |
|
|
processed_images.append(denoised) |
|
|
|
|
|
|
|
|
kernel_sharpen = np.array([[-1, -1, -1], |
|
|
[-1, 9, -1], |
|
|
[-1, -1, -1]]) |
|
|
sharpened = cv2.filter2D(gray, -1, kernel_sharpen) |
|
|
processed_images.append(sharpened) |
|
|
|
|
|
return processed_images |
|
|
|
|
|
def extract_text_from_image(self, image_np: np.ndarray) -> List[Dict]: |
|
|
""" |
|
|
Extrae todo el texto visible de una imagen. |
|
|
|
|
|
Args: |
|
|
image_np: Imagen en formato numpy array |
|
|
|
|
|
Returns: |
|
|
Lista de diccionarios con texto detectado y confianza |
|
|
""" |
|
|
all_results = [] |
|
|
|
|
|
|
|
|
processed_images = self.preprocess_image(image_np) |
|
|
|
|
|
for idx, processed in enumerate(processed_images): |
|
|
try: |
|
|
results = self.reader.readtext(processed, paragraph=False) |
|
|
|
|
|
for bbox, text, confidence in results: |
|
|
all_results.append({ |
|
|
'text': text, |
|
|
'confidence': float(confidence), |
|
|
'bbox': bbox, |
|
|
'preprocessing_method': idx |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Error en método de preprocesamiento {idx}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
unique_results = self._deduplicate_results(all_results) |
|
|
|
|
|
return unique_results |
|
|
|
|
|
def _deduplicate_results(self, results: List[Dict]) -> List[Dict]: |
|
|
""" |
|
|
Elimina resultados duplicados, manteniendo el de mayor confianza. |
|
|
""" |
|
|
seen = {} |
|
|
|
|
|
for result in results: |
|
|
text = result['text'].lower().strip() |
|
|
|
|
|
if text not in seen or result['confidence'] > seen[text]['confidence']: |
|
|
seen[text] = result |
|
|
|
|
|
return list(seen.values()) |
|
|
|
|
|
def extract_domain_from_thumb(self, image_np: np.ndarray, |
|
|
min_confidence: float = 0.6) -> List[Dict]: |
|
|
""" |
|
|
Extrae dominios específicamente de una miniatura. |
|
|
Este es el método principal para romper el bloqueo de PimEyes. |
|
|
|
|
|
Args: |
|
|
image_np: Imagen en formato numpy array |
|
|
min_confidence: Confianza mínima para considerar válido (0.0-1.0) |
|
|
|
|
|
Returns: |
|
|
Lista de dominios encontrados con metadata |
|
|
""" |
|
|
|
|
|
text_results = self.extract_text_from_image(image_np) |
|
|
|
|
|
found_domains = [] |
|
|
|
|
|
for result in text_results: |
|
|
text = result['text'] |
|
|
confidence = result['confidence'] |
|
|
|
|
|
if confidence < min_confidence: |
|
|
continue |
|
|
|
|
|
|
|
|
cleaned_text = self._clean_text(text) |
|
|
|
|
|
|
|
|
domains = self._find_domains_in_text(cleaned_text) |
|
|
|
|
|
for domain in domains: |
|
|
found_domains.append({ |
|
|
'domain': domain, |
|
|
'confidence': confidence, |
|
|
'original_text': text, |
|
|
'cleaned_text': cleaned_text, |
|
|
'bbox': result['bbox'], |
|
|
'method': result['preprocessing_method'] |
|
|
}) |
|
|
|
|
|
|
|
|
found_domains.sort(key=lambda x: x['confidence'], reverse=True) |
|
|
|
|
|
|
|
|
unique_domains = self._deduplicate_domains(found_domains) |
|
|
|
|
|
logger.info(f"OCR: Encontrados {len(unique_domains)} dominios únicos") |
|
|
|
|
|
return unique_domains |
|
|
|
|
|
def _clean_text(self, text: str) -> str: |
|
|
""" |
|
|
Limpia el texto extraído para mejorar la detección de dominios. |
|
|
""" |
|
|
|
|
|
text = text.lower() |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', '', text) |
|
|
|
|
|
|
|
|
corrections = { |
|
|
'c0m': 'com', |
|
|
'c om': 'com', |
|
|
'co m': 'com', |
|
|
'n et': 'net', |
|
|
'ne t': 'net', |
|
|
'0rg': 'org', |
|
|
'o rg': 'org', |
|
|
'i o': 'io', |
|
|
'tv ': 'tv', |
|
|
'xxx ': 'xxx', |
|
|
} |
|
|
|
|
|
for wrong, correct in corrections.items(): |
|
|
text = text.replace(wrong, correct) |
|
|
|
|
|
return text |
|
|
|
|
|
def _find_domains_in_text(self, text: str) -> List[str]: |
|
|
""" |
|
|
Encuentra dominios en un texto usando patrones y heurísticas. |
|
|
""" |
|
|
domains = [] |
|
|
|
|
|
|
|
|
for pattern in self.URL_PATTERNS: |
|
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
|
domains.extend(matches) |
|
|
|
|
|
|
|
|
for tld_pattern in self.TLD_PATTERNS: |
|
|
|
|
|
pattern = r'([a-zA-Z0-9-]+' + tld_pattern + r'(?:/[^\s]*)?)' |
|
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
|
domains.extend(matches) |
|
|
|
|
|
|
|
|
for platform in self.KNOWN_PLATFORMS: |
|
|
if platform in text: |
|
|
|
|
|
username_pattern = rf'{platform}\.com/([a-zA-Z0-9_-]+)' |
|
|
username_match = re.search(username_pattern, text) |
|
|
|
|
|
if username_match: |
|
|
domains.append(f"{platform}.com/{username_match.group(1)}") |
|
|
else: |
|
|
domains.append(f"{platform}.com") |
|
|
|
|
|
|
|
|
cleaned_domains = [] |
|
|
for domain in domains: |
|
|
domain = domain.strip().lower() |
|
|
domain = re.sub(r'^https?://', '', domain) |
|
|
domain = re.sub(r'^www\.', '', domain) |
|
|
|
|
|
|
|
|
if self._is_valid_domain(domain): |
|
|
cleaned_domains.append(domain) |
|
|
|
|
|
return list(set(cleaned_domains)) |
|
|
|
|
|
def _is_valid_domain(self, domain: str) -> bool: |
|
|
""" |
|
|
Valida que una cadena parece ser un dominio válido. |
|
|
""" |
|
|
|
|
|
if '.' not in domain: |
|
|
return False |
|
|
|
|
|
|
|
|
if ' ' in domain: |
|
|
return False |
|
|
|
|
|
|
|
|
has_valid_tld = any(tld.replace('\\', '').replace('.', '') in domain |
|
|
for tld in self.TLD_PATTERNS) |
|
|
|
|
|
return has_valid_tld |
|
|
|
|
|
def _deduplicate_domains(self, domains: List[Dict]) -> List[Dict]: |
|
|
""" |
|
|
Elimina dominios duplicados, manteniendo el de mayor confianza. |
|
|
""" |
|
|
seen = {} |
|
|
|
|
|
for item in domains: |
|
|
domain = item['domain'] |
|
|
|
|
|
if domain not in seen or item['confidence'] > seen[domain]['confidence']: |
|
|
seen[domain] = item |
|
|
|
|
|
return list(seen.values()) |
|
|
|
|
|
def extract_from_pimeyes_thumbnail(self, image_np: np.ndarray) -> Dict: |
|
|
""" |
|
|
Método especializado para miniaturas de PimEyes. |
|
|
Aplica técnicas específicas para este sitio. |
|
|
|
|
|
Args: |
|
|
image_np: Miniatura de PimEyes (generalmente con blur) |
|
|
|
|
|
Returns: |
|
|
Diccionario con dominios extraídos y metadata |
|
|
""" |
|
|
logger.info("Procesando miniatura de PimEyes con técnicas especializadas") |
|
|
|
|
|
|
|
|
height = image_np.shape[0] |
|
|
|
|
|
|
|
|
bottom_region = image_np[int(height * 0.7):, :] |
|
|
|
|
|
|
|
|
deblurred = self._deblur_text_region(bottom_region) |
|
|
|
|
|
|
|
|
domains = self.extract_domain_from_thumb(deblurred, min_confidence=0.5) |
|
|
|
|
|
return { |
|
|
'domains': domains, |
|
|
'source': 'pimeyes', |
|
|
'confidence_avg': np.mean([d['confidence'] for d in domains]) if domains else 0.0, |
|
|
'total_found': len(domains) |
|
|
} |
|
|
|
|
|
def _deblur_text_region(self, image_np: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Aplica técnicas de deblurring específicas para regiones de texto. |
|
|
""" |
|
|
|
|
|
if len(image_np.shape) == 3: |
|
|
gray = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY) |
|
|
else: |
|
|
gray = image_np |
|
|
|
|
|
|
|
|
kernel = np.ones((3, 3), np.float32) / 9 |
|
|
deblurred = cv2.filter2D(gray, -1, kernel) |
|
|
|
|
|
|
|
|
kernel_sharpen = np.array([[-1, -1, -1, -1, -1], |
|
|
[-1, 2, 2, 2, -1], |
|
|
[-1, 2, 8, 2, -1], |
|
|
[-1, 2, 2, 2, -1], |
|
|
[-1, -1, -1, -1, -1]]) / 8.0 |
|
|
|
|
|
sharpened = cv2.filter2D(deblurred, -1, kernel_sharpen) |
|
|
|
|
|
|
|
|
sharpened = cv2.equalizeHist(sharpened.astype(np.uint8)) |
|
|
|
|
|
return sharpened |
|
|
|
|
|
|
|
|
|
|
|
def quick_extract_domains(image_path: str, min_confidence: float = 0.6) -> List[str]: |
|
|
""" |
|
|
Función de conveniencia para extraer dominios rápidamente. |
|
|
|
|
|
Args: |
|
|
image_path: Ruta a la imagen |
|
|
min_confidence: Confianza mínima |
|
|
|
|
|
Returns: |
|
|
Lista de dominios encontrados |
|
|
""" |
|
|
import cv2 |
|
|
|
|
|
image = cv2.imread(image_path) |
|
|
if image is None: |
|
|
raise ValueError(f"No se pudo cargar la imagen: {image_path}") |
|
|
|
|
|
extractor = OCRExtractor() |
|
|
results = extractor.extract_domain_from_thumb(image, min_confidence) |
|
|
|
|
|
return [r['domain'] for r in results] |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
import sys |
|
|
|
|
|
if len(sys.argv) > 1: |
|
|
image_path = sys.argv[1] |
|
|
domains = quick_extract_domains(image_path) |
|
|
|
|
|
print(f"\n🔍 Dominios encontrados: {len(domains)}") |
|
|
for domain in domains: |
|
|
print(f" • {domain}") |
|
|
else: |
|
|
print("Uso: python ocr_extractor.py <ruta_imagen>") |
|
|
|