CCI_OLLAMA_CODE_CHAT / file_processing.py
jeanmarcocruz207's picture
Upload 29 files
3754f8b verified
import io
import json
import zipfile
from pathlib import Path
from typing import Tuple, List, Set
from config import settings
# flags opcionales
HAS_OCR = False
HAS_PDF = False
try:
from PIL import Image # noqa
import pytesseract # noqa
HAS_OCR = True
except ImportError:
HAS_OCR = False
try:
import PyPDF2 # noqa
HAS_PDF = True
except ImportError:
HAS_PDF = False
CODE_EXTS = {
".py": "python", ".js": "javascript", ".ts": "typescript",
".java": "java", ".cs": "csharp", ".php": "php",
".rb": "ruby", ".go": "go", ".rs": "rust",
".c": "c", ".cpp": "cpp", ".h": "c", ".css": "css",
".html": "html", ".htm": "html", ".sql": "sql",
".sh": "bash", ".bash": "bash", ".yml": "yaml",
".yaml": "yaml", ".json": "json", ".xml": "xml",
".md": "markdown"
}
def guess_lang_from_name(name: str):
return CODE_EXTS.get(Path(name).suffix.lower())
def guess_lang_from_content(content: str):
if not isinstance(content, str):
return None
low = content.lower()
if "def " in low or "import " in low:
return "python"
if "public class" in content or "System.out.println" in content:
return "java"
if "select " in low or "create table" in low:
return "sql"
if "function " in low and "console.log" in low:
return "javascript"
if "<html" in low:
return "html"
return None
def truncate_text(txt: str) -> str:
max_chars = settings.MAX_CHARS_PER_FILE
if len(txt) <= max_chars:
return txt
return txt[:max_chars] + "\n[... archivo recortado ...]"
def read_image_to_text(raw: bytes) -> str:
if not HAS_OCR:
return "[Funcionalidad OCR no disponible. Instala 'pytesseract' y 'tesseract-ocr']"
try:
from PIL import Image
import pytesseract
img = Image.open(io.BytesIO(raw))
text = pytesseract.image_to_string(img)
return text.strip() or "[Imagen sin texto extraíble]"
except Exception as e:
return f"[Error OCR: {e}]"
def read_pdf_to_text(raw: bytes) -> str:
if not HAS_PDF:
return "[Funcionalidad PDF no disponible. Instala 'PyPDF2']"
try:
import PyPDF2
reader = PyPDF2.PdfReader(io.BytesIO(raw))
texts = [p.extract_text() or "" for p in reader.pages]
result = "\n".join(texts).strip()
return result or "[PDF sin texto extraíble]"
except Exception as e:
return f"[Error PDF: {e}]"
def read_zip(raw: bytes, zip_name: str) -> str:
collected = []
try:
with zipfile.ZipFile(io.BytesIO(raw)) as zf:
for info in zf.infolist():
if info.is_dir():
continue
inner_name = info.filename
data = zf.read(inner_name)
ext = Path(inner_name).suffix.lower()
if ext in CODE_EXTS or ext in [".txt", ".md"]:
try:
text = data.decode("utf-8", errors="replace")
except Exception:
text = "[No decodificable]"
lang = guess_lang_from_name(inner_name) or guess_lang_from_content(text) or "text"
text = truncate_text(text)
collected.append(f"--- {inner_name} ({lang}) ---\n{text}\n")
except zipfile.BadZipFile:
return f"[Error leyendo ZIP: archivo corrupto ({zip_name})]"
except Exception as e:
return f"[Error leyendo ZIP: {e}]"
return "\n".join(collected) if collected else f"[ZIP {zip_name} sin archivos útiles]"
def read_uploaded_files(files, exclude_text: str):
if not files:
return "", "Sin archivos", ""
exclude: Set[str] = {x.strip() for x in exclude_text.splitlines() if x.strip()}
parts: List[str] = []
preview: List[str] = []
total_size = 0
first_code = ""
for f in files:
name = getattr(f, "name", "archivo")
basename = Path(name).name
if basename in exclude or name in exclude:
preview.append(f"🚫 {basename} (excluido)")
continue
try:
f.seek(0)
except Exception:
pass
raw = f.read()
file_size = len(raw)
total_size += file_size
if total_size > settings.MAX_TOTAL_UPLOAD:
preview.append("⚠️ Límite total de carga superado, se ignoró el resto.")
break
if file_size > settings.MAX_FILE_SIZE:
parts.append(f"# {basename}\n[Archivo muy grande, ignorado]\n")
preview.append(f"⚠️ {basename} (muy grande)")
continue
suffix = Path(basename).suffix.lower()
if suffix == ".zip":
content = read_zip(raw, basename)
parts.append(f"# {basename} (zip)\n{content}\n")
preview.append(f"📦 {basename}")
if not first_code and content.strip():
first_code = content[:settings.MAX_CHARS_PER_FILE]
elif suffix in [".png", ".jpg", ".jpeg", ".webp", ".bmp"]:
content = read_image_to_text(raw)
parts.append(f"# {basename} (imagen)\n{content}\n")
preview.append(f"🖼️ {basename}")
elif suffix == ".pdf":
content = read_pdf_to_text(raw)
parts.append(f"# {basename} (pdf)\n{content}\n")
preview.append(f"📄 {basename}")
else:
try:
text = raw.decode("utf-8", errors="replace")
except Exception:
text = "[No decodificable]"
text = truncate_text(text)
lang = guess_lang_from_name(basename) or guess_lang_from_content(text) or "text"
parts.append(f"# {basename} ({lang})\n{text}\n")
preview.append(f"📝 {basename} ({lang})")
if not first_code and lang != "text":
first_code = text
return "\n".join(parts), "\n".join(preview) if preview else "Sin archivos válidos", first_code