J / src /ocr_extractor.py
Andro0s's picture
Upload 13 files
85fa7d2 verified
"""
OCR Extractor - Módulo Detective para extraer URLs ocultas de miniaturas
Este módulo rompe el bloqueo de sitios que censuran URLs con blur.
"""
import easyocr
import numpy as np
import cv2
import re
from typing import List, Dict, Optional
from loguru import logger
class OCRExtractor:
"""
Extrae dominios y URLs de imágenes, incluso si están borrosas o parcialmente ocultas.
Implementa técnicas de pre-procesamiento para mejorar la detección en miniaturas de baja calidad.
"""
# Extensiones de dominio comunes
TLD_PATTERNS = [
r'\.com', r'\.net', r'\.org', r'\.io', r'\.co',
r'\.tv', r'\.me', r'\.site', r'\.app', r'\.dev',
r'\.xxx', r'\.adult', r'\.porn', r'\.sex', # Dominios adultos
r'\.fan', r'\.fans', r'\.cam', r'\.live'
]
# Patrones de URL completas
URL_PATTERNS = [
r'https?://[^\s]+', # URLs con protocolo
r'www\.[a-zA-Z0-9-]+\.[a-zA-Z]{2,}', # www.dominio.com
r'[a-zA-Z0-9-]+\.(?:com|net|org|io|xxx|adult|porn|cam)', # dominio.com
]
# Plataformas conocidas
KNOWN_PLATFORMS = [
'onlyfans', 'fansly', 'patreon', 'instagram', 'twitter',
'tiktok', 'reddit', 'imgur', 'flickr', 'tumblr',
'xvideos', 'pornhub', 'xnxx', 'redtube', 'youporn',
'chaturbate', 'myfreecams', 'streamate', 'bongacams'
]
def __init__(self, gpu: bool = True, languages: List[str] = None):
"""
Inicializa el OCR engine.
Args:
gpu: Usar GPU si está disponible
languages: Lista de idiomas (default: ['en'])
"""
if languages is None:
languages = ['en']
logger.info(f"Inicializando EasyOCR con GPU={gpu}, idiomas={languages}")
try:
self.reader = easyocr.Reader(languages, gpu=gpu)
logger.success("EasyOCR inicializado correctamente")
except Exception as e:
logger.warning(f"Error al inicializar con GPU, usando CPU: {e}")
self.reader = easyocr.Reader(languages, gpu=False)
def preprocess_image(self, image_np: np.ndarray) -> List[np.ndarray]:
"""
Pre-procesa la imagen con múltiples técnicas para mejorar la detección de texto.
Retorna múltiples versiones de la imagen procesada.
Args:
image_np: Imagen en formato numpy array (BGR)
Returns:
Lista de imágenes procesadas
"""
processed_images = []
# Convertir a escala de grises
if len(image_np.shape) == 3:
gray = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY)
else:
gray = image_np.copy()
# 1. Imagen original en escala de grises
processed_images.append(gray)
# 2. Umbral binario (para texto oscuro en fondo claro)
_, thresh1 = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
processed_images.append(thresh1)
# 3. Umbral binario invertido (para texto claro en fondo oscuro)
_, thresh2 = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY_INV)
processed_images.append(thresh2)
# 4. Umbral adaptativo (para imágenes con iluminación irregular)
adaptive = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
processed_images.append(adaptive)
# 5. Mejorar contraste con CLAHE
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
processed_images.append(enhanced)
# 6. Reducción de ruido
denoised = cv2.fastNlMeansDenoising(gray, None, 10, 7, 21)
processed_images.append(denoised)
# 7. Sharpening (para texto borroso)
kernel_sharpen = np.array([[-1, -1, -1],
[-1, 9, -1],
[-1, -1, -1]])
sharpened = cv2.filter2D(gray, -1, kernel_sharpen)
processed_images.append(sharpened)
return processed_images
def extract_text_from_image(self, image_np: np.ndarray) -> List[Dict]:
"""
Extrae todo el texto visible de una imagen.
Args:
image_np: Imagen en formato numpy array
Returns:
Lista de diccionarios con texto detectado y confianza
"""
all_results = []
# Procesar múltiples versiones de la imagen
processed_images = self.preprocess_image(image_np)
for idx, processed in enumerate(processed_images):
try:
results = self.reader.readtext(processed, paragraph=False)
for bbox, text, confidence in results:
all_results.append({
'text': text,
'confidence': float(confidence),
'bbox': bbox,
'preprocessing_method': idx
})
except Exception as e:
logger.debug(f"Error en método de preprocesamiento {idx}: {e}")
continue
# Eliminar duplicados y mantener los de mayor confianza
unique_results = self._deduplicate_results(all_results)
return unique_results
def _deduplicate_results(self, results: List[Dict]) -> List[Dict]:
"""
Elimina resultados duplicados, manteniendo el de mayor confianza.
"""
seen = {}
for result in results:
text = result['text'].lower().strip()
if text not in seen or result['confidence'] > seen[text]['confidence']:
seen[text] = result
return list(seen.values())
def extract_domain_from_thumb(self, image_np: np.ndarray,
min_confidence: float = 0.6) -> List[Dict]:
"""
Extrae dominios específicamente de una miniatura.
Este es el método principal para romper el bloqueo de PimEyes.
Args:
image_np: Imagen en formato numpy array
min_confidence: Confianza mínima para considerar válido (0.0-1.0)
Returns:
Lista de dominios encontrados con metadata
"""
# Extraer todo el texto
text_results = self.extract_text_from_image(image_np)
found_domains = []
for result in text_results:
text = result['text']
confidence = result['confidence']
if confidence < min_confidence:
continue
# Limpiar texto
cleaned_text = self._clean_text(text)
# Buscar dominios
domains = self._find_domains_in_text(cleaned_text)
for domain in domains:
found_domains.append({
'domain': domain,
'confidence': confidence,
'original_text': text,
'cleaned_text': cleaned_text,
'bbox': result['bbox'],
'method': result['preprocessing_method']
})
# Ordenar por confianza
found_domains.sort(key=lambda x: x['confidence'], reverse=True)
# Eliminar duplicados
unique_domains = self._deduplicate_domains(found_domains)
logger.info(f"OCR: Encontrados {len(unique_domains)} dominios únicos")
return unique_domains
def _clean_text(self, text: str) -> str:
"""
Limpia el texto extraído para mejorar la detección de dominios.
"""
# Convertir a minúsculas
text = text.lower()
# Remover espacios múltiples
text = re.sub(r'\s+', '', text)
# Corregir errores comunes de OCR
corrections = {
'c0m': 'com',
'c om': 'com',
'co m': 'com',
'n et': 'net',
'ne t': 'net',
'0rg': 'org',
'o rg': 'org',
'i o': 'io',
'tv ': 'tv',
'xxx ': 'xxx',
}
for wrong, correct in corrections.items():
text = text.replace(wrong, correct)
return text
def _find_domains_in_text(self, text: str) -> List[str]:
"""
Encuentra dominios en un texto usando patrones y heurísticas.
"""
domains = []
# Método 1: Buscar con regex de URLs
for pattern in self.URL_PATTERNS:
matches = re.findall(pattern, text, re.IGNORECASE)
domains.extend(matches)
# Método 2: Buscar TLDs
for tld_pattern in self.TLD_PATTERNS:
# Buscar palabra seguida de TLD
pattern = r'([a-zA-Z0-9-]+' + tld_pattern + r'(?:/[^\s]*)?)'
matches = re.findall(pattern, text, re.IGNORECASE)
domains.extend(matches)
# Método 3: Buscar plataformas conocidas
for platform in self.KNOWN_PLATFORMS:
if platform in text:
# Intentar extraer username si existe
username_pattern = rf'{platform}\.com/([a-zA-Z0-9_-]+)'
username_match = re.search(username_pattern, text)
if username_match:
domains.append(f"{platform}.com/{username_match.group(1)}")
else:
domains.append(f"{platform}.com")
# Limpiar y validar dominios
cleaned_domains = []
for domain in domains:
domain = domain.strip().lower()
domain = re.sub(r'^https?://', '', domain)
domain = re.sub(r'^www\.', '', domain)
# Validar que parece un dominio válido
if self._is_valid_domain(domain):
cleaned_domains.append(domain)
return list(set(cleaned_domains)) # Eliminar duplicados
def _is_valid_domain(self, domain: str) -> bool:
"""
Valida que una cadena parece ser un dominio válido.
"""
# Debe tener al menos un punto
if '.' not in domain:
return False
# No debe tener espacios
if ' ' in domain:
return False
# Debe tener un TLD válido
has_valid_tld = any(tld.replace('\\', '').replace('.', '') in domain
for tld in self.TLD_PATTERNS)
return has_valid_tld
def _deduplicate_domains(self, domains: List[Dict]) -> List[Dict]:
"""
Elimina dominios duplicados, manteniendo el de mayor confianza.
"""
seen = {}
for item in domains:
domain = item['domain']
if domain not in seen or item['confidence'] > seen[domain]['confidence']:
seen[domain] = item
return list(seen.values())
def extract_from_pimeyes_thumbnail(self, image_np: np.ndarray) -> Dict:
"""
Método especializado para miniaturas de PimEyes.
Aplica técnicas específicas para este sitio.
Args:
image_np: Miniatura de PimEyes (generalmente con blur)
Returns:
Diccionario con dominios extraídos y metadata
"""
logger.info("Procesando miniatura de PimEyes con técnicas especializadas")
# PimEyes suele poner el dominio en la parte inferior
height = image_np.shape[0]
# Extraer solo la parte inferior (donde suele estar el texto)
bottom_region = image_np[int(height * 0.7):, :]
# Aplicar mejoras específicas para texto con blur
deblurred = self._deblur_text_region(bottom_region)
# Extraer dominios
domains = self.extract_domain_from_thumb(deblurred, min_confidence=0.5)
return {
'domains': domains,
'source': 'pimeyes',
'confidence_avg': np.mean([d['confidence'] for d in domains]) if domains else 0.0,
'total_found': len(domains)
}
def _deblur_text_region(self, image_np: np.ndarray) -> np.ndarray:
"""
Aplica técnicas de deblurring específicas para regiones de texto.
"""
# Convertir a escala de grises
if len(image_np.shape) == 3:
gray = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY)
else:
gray = image_np
# Aplicar Wiener filter aproximado
kernel = np.ones((3, 3), np.float32) / 9
deblurred = cv2.filter2D(gray, -1, kernel)
# Sharpen agresivo
kernel_sharpen = np.array([[-1, -1, -1, -1, -1],
[-1, 2, 2, 2, -1],
[-1, 2, 8, 2, -1],
[-1, 2, 2, 2, -1],
[-1, -1, -1, -1, -1]]) / 8.0
sharpened = cv2.filter2D(deblurred, -1, kernel_sharpen)
# Aumentar contraste
sharpened = cv2.equalizeHist(sharpened.astype(np.uint8))
return sharpened
# Función de utilidad para uso directo
def quick_extract_domains(image_path: str, min_confidence: float = 0.6) -> List[str]:
"""
Función de conveniencia para extraer dominios rápidamente.
Args:
image_path: Ruta a la imagen
min_confidence: Confianza mínima
Returns:
Lista de dominios encontrados
"""
import cv2
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"No se pudo cargar la imagen: {image_path}")
extractor = OCRExtractor()
results = extractor.extract_domain_from_thumb(image, min_confidence)
return [r['domain'] for r in results]
if __name__ == "__main__":
# Ejemplo de uso
import sys
if len(sys.argv) > 1:
image_path = sys.argv[1]
domains = quick_extract_domains(image_path)
print(f"\n🔍 Dominios encontrados: {len(domains)}")
for domain in domains:
print(f" • {domain}")
else:
print("Uso: python ocr_extractor.py <ruta_imagen>")