Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """ | |
| HF Space - Remote Indexer (No-Qdrant) | |
| Stockage & recherche vectorielle avec 🤗 datasets + FAISS (local), UI Gradio. | |
| Pipeline: | |
| - /index: chunk → embeddings (HF Inference ou dummy) → Dataset.from_dict → add_faiss_index(IP) → save_to_disk | |
| - /count: lit le dataset sur disque (si non chargé) → renvoie nb de lignes | |
| - /query: embed requête → dataset.get_nearest_examples('embedding', query, k) | |
| - /wipe: supprime le dossier projet | |
| - /export_hub (optionnel): pousse le dossier projet dans un repo Dataset du Hub | |
| ENV: | |
| - EMB_PROVIDER ("hf" | "dummy", défaut "hf") | |
| - HF_EMBED_MODEL (ex: "BAAI/bge-m3" | "intfloat/e5-base-v2") | |
| - HUGGINGFACEHUB_API_TOKEN (requis si EMB_PROVIDER=hf) | |
| - EMB_FALLBACK_TO_DUMMY (true/false) | |
| - DATA_DIR (par défaut: auto-pick writable: $DATA_DIR, ./data, /home/user/app/data, /home/user/data, /tmp/data) | |
| - HF_DATASET_REPO (optionnel "username/my_proj_vectors") pour export | |
| - LOG_LEVEL (DEBUG par défaut) | |
| - UI_PATH ("/ui") | |
| - PORT (7860) | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import io | |
| import re | |
| import json | |
| import time | |
| import uuid | |
| import shutil | |
| import hashlib | |
| import logging | |
| import asyncio | |
| import threading | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import numpy as np | |
| import httpx | |
| import uvicorn | |
| import gradio as gr | |
| import faiss # type: ignore | |
| from pydantic import BaseModel, Field, ValidationError | |
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import RedirectResponse | |
| from datasets import Dataset, Features, Sequence, Value, load_from_disk | |
| try: | |
| from huggingface_hub import HfApi, create_repo | |
| except Exception: | |
| HfApi = None | |
| create_repo = None | |
| # ------------------------------------------------------------------------------ | |
| # Config & logs | |
| # ------------------------------------------------------------------------------ | |
| LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() | |
| logging.basicConfig( | |
| level=getattr(logging, LOG_LEVEL, logging.DEBUG), | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| ) | |
| LOG = logging.getLogger("remote_indexer_noqdrant") | |
| EMB_PROVIDER = os.getenv("EMB_PROVIDER", "hf").lower() # "hf" | "dummy" | |
| HF_EMBED_MODEL = os.getenv("HF_EMBED_MODEL", "BAAI/bge-m3") | |
| HF_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN", "") | |
| EMB_FALLBACK_TO_DUMMY = os.getenv("EMB_FALLBACK_TO_DUMMY", "false").lower() in ("1","true","yes","on") | |
| UI_PATH = os.getenv("UI_PATH", "/ui") | |
| HF_DATASET_REPO = os.getenv("HF_DATASET_REPO", "").strip() # optionnel | |
| if EMB_PROVIDER == "hf" and not HF_TOKEN and not EMB_FALLBACK_TO_DUMMY: | |
| LOG.warning("EMB_PROVIDER=hf sans HUGGINGFACEHUB_API_TOKEN (pas de fallback). Mets EMB_PROVIDER=dummy ou EMB_FALLBACK_TO_DUMMY=true pour tester.") | |
| # ------------------------------------------------------------------------------ | |
| # Sélection robuste d'un DATA_DIR writable | |
| # ------------------------------------------------------------------------------ | |
| def pick_data_dir() -> str: | |
| candidates = [ | |
| os.getenv("DATA_DIR", "").strip(), # priorité à l'env si fourni | |
| os.path.join(os.getcwd(), "data"), # ./data dans le WORKDIR (/app) | |
| "/home/user/app/data", # chemins typiques HF Spaces | |
| "/home/user/data", | |
| "/tmp/data", # toujours writable | |
| ] | |
| for p in candidates: | |
| if not p: | |
| continue | |
| try: | |
| os.makedirs(p, exist_ok=True) | |
| testp = os.path.join(p, ".rw_test") | |
| with open(testp, "w", encoding="utf-8") as f: | |
| f.write("ok") | |
| os.remove(testp) | |
| LOG.info(f"[DATA_DIR] Utilisation de: {p}") | |
| return p | |
| except Exception as e: | |
| LOG.warning(f"[DATA_DIR] Candidat non writable '{p}': {e}") | |
| raise RuntimeError("Aucun répertoire DATA_DIR accessible en écriture.") | |
| DATA_DIR = pick_data_dir() | |
| # ------------------------------------------------------------------------------ | |
| # Modèles Pydantic | |
| # ------------------------------------------------------------------------------ | |
| class FileItem(BaseModel): | |
| path: str | |
| text: str | |
| class IndexRequest(BaseModel): | |
| project_id: str = Field(..., min_length=1) | |
| files: List[FileItem] = Field(default_factory=list) | |
| chunk_size: int = Field(200, ge=64, le=4096) | |
| overlap: int = Field(20, ge=0, le=512) | |
| batch_size: int = Field(32, ge=1, le=1024) | |
| store_text: bool = True | |
| class QueryRequest(BaseModel): | |
| project_id: str | |
| text: str | |
| top_k: int = Field(5, ge=1, le=100) | |
| class JobState(BaseModel): | |
| job_id: str | |
| project_id: str | |
| stage: str = "pending" # pending -> embedding -> indexing -> done/failed | |
| total_files: int = 0 | |
| total_chunks: int = 0 | |
| embedded: int = 0 | |
| indexed: int = 0 | |
| errors: List[str] = Field(default_factory=list) | |
| messages: List[str] = Field(default_factory=list) | |
| started_at: float = Field(default_factory=time.time) | |
| finished_at: Optional[float] = None | |
| def log(self, msg: str) -> None: | |
| stamp = time.strftime("%H:%M:%S") | |
| line = f"[{stamp}] {msg}" | |
| self.messages.append(line) | |
| LOG.debug(f"[{self.job_id}] {msg}") | |
| JOBS: Dict[str, JobState] = {} | |
| # In-memory cache {project_id: (Dataset, dim)} | |
| DATASETS: Dict[str, Tuple[Dataset, int]] = {} | |
| # ------------------------------------------------------------------------------ | |
| # Utils | |
| # ------------------------------------------------------------------------------ | |
| def hash8(s: str) -> str: | |
| return hashlib.sha256(s.encode("utf-8")).hexdigest()[:16] | |
| def l2_normalize(vec: List[float]) -> List[float]: | |
| arr = np.array(vec, dtype=np.float32) | |
| n = float(np.linalg.norm(arr)) | |
| if n > 0: | |
| arr = arr / n | |
| return arr.astype(np.float32).tolist() | |
| def flatten_any(x: Any) -> List[float]: | |
| if isinstance(x, (list, tuple)): | |
| if len(x) > 0 and isinstance(x[0], (list, tuple)): | |
| return flatten_any(x[0]) | |
| return list(map(float, x)) | |
| raise ValueError("Embedding vector mal formé") | |
| def chunk_text(text: str, chunk_size: int, overlap: int) -> List[Tuple[int, int, str]]: | |
| text = text or "" | |
| if not text.strip(): | |
| return [] | |
| res = [] | |
| n = len(text) | |
| i = 0 | |
| while i < n: | |
| j = min(i + chunk_size, n) | |
| chunk = text[i:j] | |
| if len(chunk.strip()) >= 30: | |
| res.append((i, j, chunk)) | |
| i = j - overlap | |
| if i <= 0: | |
| i = j | |
| return res | |
| def project_paths(project_id: str) -> Dict[str, str]: | |
| base = os.path.join(DATA_DIR, project_id) | |
| return { | |
| "base": base, | |
| "ds_dir": os.path.join(base, "dataset"), | |
| "faiss_dir": os.path.join(base, "faiss"), | |
| "faiss_file": os.path.join(base, "faiss", "emb.faiss"), | |
| "meta_file": os.path.join(base, "meta.json"), | |
| } | |
| def save_meta(meta_path: str, data: Dict[str, Any]) -> None: | |
| os.makedirs(os.path.dirname(meta_path), exist_ok=True) | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| def load_meta(meta_path: str) -> Dict[str, Any]: | |
| if not os.path.exists(meta_path): | |
| return {} | |
| try: | |
| with open(meta_path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {} | |
| # ------------------------------------------------------------------------------ | |
| # Embeddings (HF Inference ou dummy) | |
| # ------------------------------------------------------------------------------ | |
| def _maybe_prefix_for_model(texts: List[str], model_name: str) -> List[str]: | |
| m = (model_name or "").lower() | |
| if "e5" in m: | |
| # E5: "query: ..." / "passage: ..." etc. Ici on uniformise simple. | |
| return [("query: " + t) for t in texts] | |
| return texts | |
| async def embed_hf(client: httpx.AsyncClient, texts: List[str], model: str = HF_EMBED_MODEL, token: str = HF_TOKEN) -> List[List[float]]: | |
| if not token: | |
| raise HTTPException(status_code=400, detail="HUGGINGFACEHUB_API_TOKEN manquant pour EMB_PROVIDER=hf") | |
| url = f"https://api-inference.huggingface.co/models/{model}" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| inputs = _maybe_prefix_for_model(texts, model) | |
| payload = {"inputs": inputs, "options": {"wait_for_model": True}} | |
| LOG.debug(f"HF POST model={model} n_texts={len(texts)}") | |
| r = await client.post(url, headers=headers, json=payload, timeout=180) | |
| if r.status_code != 200: | |
| detail = r.text | |
| LOG.error(f"HF Inference error {r.status_code}: {detail[:400]}") | |
| raise HTTPException(status_code=502, detail=f"HF Inference error {r.status_code}: {detail}") | |
| data = r.json() | |
| embeddings: List[List[float]] = [] | |
| if isinstance(data, list): | |
| for row in data: | |
| vec = flatten_any(row) | |
| embeddings.append(l2_normalize(vec)) | |
| else: | |
| vec = flatten_any(data) | |
| embeddings.append(l2_normalize(vec)) | |
| return embeddings | |
| def embed_dummy(texts: List[str], dim: int = 128) -> List[List[float]]: | |
| out: List[List[float]] = [] | |
| for t in texts: | |
| h = hashlib.sha256(t.encode("utf-8")).digest() | |
| arr = np.frombuffer((h * ((dim // len(h)) + 1))[:dim], dtype=np.uint8).astype(np.float32) | |
| arr = (arr - 127.5) / 127.5 | |
| arr = arr / (np.linalg.norm(arr) + 1e-9) | |
| out.append(arr.astype(np.float32).tolist()) | |
| return out | |
| async def embed_texts(client: httpx.AsyncClient, texts: List[str]) -> List[List[float]]: | |
| if EMB_PROVIDER == "hf": | |
| try: | |
| return await embed_hf(client, texts) | |
| except Exception as e: | |
| if EMB_FALLBACK_TO_DUMMY: | |
| LOG.warning(f"Fallback embeddings → dummy (cause: {e})") | |
| return embed_dummy(texts, dim=128) | |
| raise | |
| return embed_dummy(texts, dim=128) | |
| # ------------------------------------------------------------------------------ | |
| # Indexation (datasets + FAISS) | |
| # ------------------------------------------------------------------------------ | |
| async def build_dataset_with_faiss(job: JobState, req: IndexRequest) -> None: | |
| """ | |
| Construit un dataset HuggingFace avec colonnes: | |
| - path (str), text (optionnel), chunk (int), start (int), end (int), embedding (float32[]) | |
| Ajoute un index FAISS (Inner Product) et persiste sur disque. | |
| """ | |
| try: | |
| job.stage = "embedding" | |
| job.total_files = len(req.files) | |
| job.log( | |
| f"Index start project={req.project_id} files={len(req.files)} " | |
| f"chunk_size={req.chunk_size} overlap={req.overlap} batch_size={req.batch_size} store_text={req.store_text} " | |
| f"provider={EMB_PROVIDER} model={HF_EMBED_MODEL}" | |
| ) | |
| # Chunking | |
| records: List[Dict[str, Any]] = [] | |
| for f in req.files: | |
| chunks = chunk_text(f.text, req.chunk_size, req.overlap) | |
| if not chunks: | |
| job.log(f"{f.path}: 0 chunk (trop court ou vide)") | |
| for idx, (start, end, ch) in enumerate(chunks): | |
| payload = {"path": f.path, "chunk": idx, "start": start, "end": end} | |
| if req.store_text: | |
| payload["text"] = ch | |
| else: | |
| payload["text"] = None | |
| payload["raw"] = ch | |
| records.append(payload) | |
| job.total_chunks = len(records) | |
| job.log(f"Total chunks = {job.total_chunks}") | |
| if job.total_chunks == 0: | |
| job.stage = "failed" | |
| job.errors.append("Aucun chunk à indexer.") | |
| job.finished_at = time.time() | |
| return | |
| # Embeddings par batch | |
| async with httpx.AsyncClient(timeout=180) as client: | |
| all_vecs: List[List[float]] = [] | |
| B = max(8, min(64, req.batch_size * 2)) | |
| i = 0 | |
| while i < len(records): | |
| sub = records[i : i + B] | |
| texts = [r["raw"] for r in sub] | |
| vecs = await embed_texts(client, texts) | |
| if len(vecs) != len(sub): | |
| raise HTTPException(status_code=500, detail="Embedding batch size mismatch") | |
| all_vecs.extend(vecs) | |
| job.embedded += len(vecs) | |
| job.log(f"Embeddings {job.embedded}/{job.total_chunks}") | |
| i += B | |
| vec_dim = len(all_vecs[0]) | |
| job.log(f"Embeddings dim={vec_dim}") | |
| # Prépare colonnes du dataset | |
| paths = [r["path"] for r in records] | |
| chunks = [int(r["chunk"]) for r in records] | |
| starts = [int(r["start"]) for r in records] | |
| ends = [int(r["end"]) for r in records] | |
| texts = [r.get("text") for r in records] | |
| features = Features({ | |
| "path": Value("string"), | |
| "chunk": Value("int32"), | |
| "start": Value("int32"), | |
| "end": Value("int32"), | |
| "text": Value("string"), | |
| "embedding": Sequence(Value("float32")), | |
| }) | |
| ds = Dataset.from_dict( | |
| { | |
| "path": paths, | |
| "chunk": chunks, | |
| "start": starts, | |
| "end": ends, | |
| "text": texts, | |
| "embedding": [np.array(v, dtype=np.float32) for v in all_vecs], | |
| }, | |
| features=features, | |
| ) | |
| # Ajoute index FAISS (Inner Product sur vecteurs normalisés ~ cosine) | |
| job.stage = "indexing" | |
| ds.add_faiss_index(column="embedding", metric_type=faiss.METRIC_INNER_PRODUCT) | |
| job.indexed = ds.num_rows | |
| job.log(f"FAISS index ajouté ({ds.num_rows} points)") | |
| # Persistance disque | |
| p = project_paths(req.project_id) | |
| os.makedirs(p["faiss_dir"], exist_ok=True) | |
| ds.save_to_disk(p["ds_dir"]) | |
| ds.save_faiss_index("embedding", p["faiss_file"]) | |
| save_meta(p["meta_file"], {"dim": vec_dim, "rows": ds.num_rows, "model": HF_EMBED_MODEL, "ts": time.time()}) | |
| # Cache mémoire | |
| DATASETS[req.project_id] = (ds, vec_dim) | |
| job.stage = "done" | |
| job.finished_at = time.time() | |
| job.log(f"Dataset sauvegardé dans {p['ds_dir']}, index FAISS → {p['faiss_file']}") | |
| except Exception as e: | |
| job.stage = "failed" | |
| job.errors.append(str(e)) | |
| job.finished_at = time.time() | |
| job.log(f"❌ Exception: {e}") | |
| def _run_job_in_thread(job: JobState, req: IndexRequest) -> None: | |
| def _runner(): | |
| try: | |
| asyncio.run(build_dataset_with_faiss(job, req)) | |
| except Exception as e: | |
| job.stage = "failed" | |
| job.errors.append(str(e)) | |
| job.finished_at = time.time() | |
| job.log(f"❌ Thread exception: {e}") | |
| t = threading.Thread(target=_runner, daemon=True) | |
| t.start() | |
| def create_and_start_job(req: IndexRequest) -> JobState: | |
| job_id = uuid.uuid4().hex[:12] | |
| job = JobState(job_id=job_id, project_id=req.project_id) | |
| JOBS[job_id] = job | |
| job.log(f"Job {job_id} créé pour project {req.project_id}") | |
| _run_job_in_thread(job, req) | |
| return job | |
| # ------------------------------------------------------------------------------ | |
| # Chargement / Query helpers | |
| # ------------------------------------------------------------------------------ | |
| def ensure_loaded(project_id: str) -> Tuple[Dataset, int]: | |
| """Charge le dataset+faiss depuis disque si pas en cache mémoire.""" | |
| if project_id in DATASETS: | |
| return DATASETS[project_id] | |
| p = project_paths(project_id) | |
| if not os.path.exists(p["ds_dir"]): | |
| raise HTTPException(status_code=404, detail=f"Dataset absent pour projet {project_id}") | |
| ds = load_from_disk(p["ds_dir"]) | |
| if os.path.exists(p["faiss_file"]): | |
| ds.load_faiss_index("embedding", p["faiss_file"]) | |
| meta = load_meta(p["meta_file"]) | |
| vec_dim = int(meta.get("dim", 0)) or len(ds[0]["embedding"]) | |
| DATASETS[project_id] = (ds, vec_dim) | |
| return ds, vec_dim | |
| async def embed_query(text: str) -> List[float]: | |
| async with httpx.AsyncClient(timeout=60) as client: | |
| vec = (await embed_texts(client, [text]))[0] | |
| return vec | |
| # ------------------------------------------------------------------------------ | |
| # FastAPI app | |
| # ------------------------------------------------------------------------------ | |
| fastapi_app = FastAPI(title="Remote Indexer - NoQdrant (Datasets+FAISS)") | |
| fastapi_app.add_middleware( | |
| CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] | |
| ) | |
| async def health(): | |
| return {"status": "ok", "emb_provider": EMB_PROVIDER, "model": HF_EMBED_MODEL, "data_dir": DATA_DIR} | |
| async def api_info(): | |
| return { | |
| "ok": True, "service": "remote-indexer-noqdrant", | |
| "emb_provider": EMB_PROVIDER, "hf_model": HF_EMBED_MODEL, | |
| "fallback_to_dummy": EMB_FALLBACK_TO_DUMMY, | |
| "data_dir": DATA_DIR, "ui_path": UI_PATH, | |
| "hub_export_enabled": bool(HF_DATASET_REPO and HfApi), | |
| } | |
| async def debug_paths(project_id: Optional[str] = None): | |
| res = {"DATA_DIR": DATA_DIR, "cwd": os.getcwd()} | |
| if project_id: | |
| res["project_paths"] = project_paths(project_id) | |
| return res | |
| async def root_redirect(): | |
| return RedirectResponse(url=UI_PATH, status_code=307) | |
| async def wipe(project_id: str = Query(..., min_length=1)): | |
| p = project_paths(project_id) | |
| if os.path.exists(p["base"]): | |
| shutil.rmtree(p["base"], ignore_errors=True) | |
| if project_id in DATASETS: | |
| DATASETS.pop(project_id, None) | |
| return {"ok": True, "project_id": project_id, "removed": True} | |
| async def index(req: IndexRequest): | |
| job = create_and_start_job(req) | |
| return {"job_id": job.job_id, "project_id": job.project_id} | |
| async def status(job_id: str): | |
| job = JOBS.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="job_id inconnu") | |
| return job.model_dump() | |
| async def coll_count(project_id: str): | |
| try: | |
| ds, _ = ensure_loaded(project_id) | |
| return {"project_id": project_id, "count": ds.num_rows} | |
| except Exception as e: | |
| return {"project_id": project_id, "count": 0, "note": f"{e}"} | |
| async def query(req: QueryRequest): | |
| ds, vec_dim = ensure_loaded(req.project_id) | |
| qvec = await embed_query(req.text) | |
| if len(qvec) != vec_dim: | |
| raise HTTPException(status_code=400, detail=f"Dim requête {len(qvec)} ≠ dim index {vec_dim}") | |
| scores, ex = ds.get_nearest_examples("embedding", np.array(qvec, dtype=np.float32), k=req.top_k) | |
| results = [] | |
| for s, path, chunk, text in zip(scores, ex["path"], ex["chunk"], ex["text"]): | |
| preview = ((text or "")[:160]).replace("\n", " ") | |
| results.append({"score": float(s), "path": path, "chunk": int(chunk), "preview": preview}) | |
| return {"result": results, "k": req.top_k} | |
| async def export_hub(project_id: str = Query(..., min_length=1), repo_id: Optional[str] = None): | |
| if not HfApi or not HF_TOKEN: | |
| raise HTTPException(status_code=400, detail="huggingface_hub non dispo ou HF token absent.") | |
| p = project_paths(project_id) | |
| if not os.path.exists(p["ds_dir"]): | |
| raise HTTPException(status_code=404, detail="Aucun dataset local à exporter.") | |
| rid = (repo_id or HF_DATASET_REPO or "").strip() | |
| if not rid: | |
| raise HTTPException(status_code=400, detail="repo_id requis (ou HF_DATASET_REPO).") | |
| api = HfApi(token=HF_TOKEN) | |
| try: | |
| create_repo(rid, repo_type="dataset", exist_ok=True, token=HF_TOKEN) | |
| except Exception: | |
| pass | |
| # zip le dossier projet | |
| buf = io.BytesIO() | |
| base_dir = p["base"] | |
| zip_name = f"{project_id}_vectors.zip" | |
| import zipfile | |
| with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| for root, _, files in os.walk(base_dir): | |
| for fn in files: | |
| full = os.path.join(root, fn) | |
| rel = os.path.relpath(full, base_dir) | |
| z.write(full, arcname=rel) | |
| buf.seek(0) | |
| api.upload_file( | |
| path_or_fileobj=buf, | |
| path_in_repo=zip_name, | |
| repo_id=rid, | |
| repo_type="dataset", | |
| ) | |
| return {"ok": True, "repo_id": rid, "file": zip_name} | |
| # ------------------------------------------------------------------------------ | |
| # Gradio UI | |
| # ------------------------------------------------------------------------------ | |
| def _default_two_docs() -> List[Dict[str, str]]: | |
| a = "Alpha bravo charlie delta echo foxtrot golf hotel india. " * 3 | |
| b = "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy. " * 3 | |
| return [{"path": "a.txt", "text": a}, {"path": "b.txt", "text": b}] | |
| async def ui_wipe(project: str): | |
| try: | |
| resp = await wipe(project) | |
| return f"✅ Wipe ok — projet {resp['project_id']} vidé." | |
| except Exception as e: | |
| LOG.exception("wipe UI error") | |
| return f"❌ Wipe erreur: {e}" | |
| async def ui_index_sample(project: str, chunk_size: int, overlap: int, batch_size: int, store_text: bool): | |
| files = _default_two_docs() | |
| req = IndexRequest( | |
| project_id=project, | |
| files=[FileItem(**f) for f in files], | |
| chunk_size=chunk_size, | |
| overlap=overlap, | |
| batch_size=batch_size, | |
| store_text=store_text, | |
| ) | |
| try: | |
| job = create_and_start_job(req) | |
| return f"🚀 Job lancé: {job.job_id}", job.job_id | |
| except ValidationError as ve: | |
| return f"❌ Payload invalide: {ve}", "" | |
| except Exception as e: | |
| LOG.exception("index UI error") | |
| return f"❌ Index erreur: {e}", "" | |
| async def ui_status(job_id: str): | |
| if not job_id.strip(): | |
| return "⚠️ Renseigne un job_id" | |
| try: | |
| st = await status(job_id) | |
| lines = [f"Job {st['job_id']} — stage={st['stage']} files={st['total_files']} chunks={st['total_chunks']} embedded={st['embedded']} indexed={st['indexed']}"] | |
| lines += st.get("messages", [])[-80:] | |
| if st.get("errors"): | |
| lines.append("Erreurs:") | |
| lines += [f" - {e}" for e in st['errors']] | |
| return "\n".join(lines) | |
| except Exception as e: | |
| return f"❌ Status erreur: {e}" | |
| async def ui_count(project: str): | |
| try: | |
| data = await coll_count(project) | |
| return f"📊 Count — project={project} → {data['count']} points" + (f" ({data.get('note')})" if 'note' in data else "") | |
| except Exception as e: | |
| LOG.exception("count UI error") | |
| return f"❌ Count erreur: {e}" | |
| async def ui_query(project: str, text: str, topk: int): | |
| try: | |
| data = await query(QueryRequest(project_id=project, text=text, top_k=topk)) | |
| hits = data.get("result", []) | |
| if not hits: | |
| return "Aucun résultat." | |
| out = [] | |
| for h in hits: | |
| out.append(f"{h['score']:.4f} — {h['path']} [chunk {h['chunk']}] — {h['preview']}…") | |
| return "\n".join(out) | |
| except Exception as e: | |
| LOG.exception("query UI error") | |
| return f"❌ Query erreur: {e}" | |
| async def ui_export(project: str, repo_id: str): | |
| try: | |
| resp = await export_hub(project, repo_id or None) | |
| return f"📤 Export → dataset repo={resp['repo_id']} file={resp['file']}" | |
| except Exception as e: | |
| LOG.exception("export UI error") | |
| return f"❌ Export erreur: {e}" | |
| with gr.Blocks(title="Remote Indexer — No-Qdrant (datasets+FAISS)", analytics_enabled=False) as ui: | |
| gr.Markdown("## 🧪 Remote Indexer — No-Qdrant (datasets+FAISS)\n" | |
| "Wipe → Index 2 docs → Status → Count → Query\n" | |
| f"- **Embeddings**: `{EMB_PROVIDER}` (model: `{HF_EMBED_MODEL}`) — " | |
| f"HF token présent: `{'oui' if bool(HF_TOKEN) else 'non'}` — Fallback dummy: `{'on' if EMB_FALLBACK_TO_DUMMY else 'off'}`\n" | |
| f"- **Data dir**: `{DATA_DIR}` — **Hub export**: `{'on' if (HF_DATASET_REPO and HfApi) else 'off'}`") | |
| with gr.Row(): | |
| project_tb = gr.Textbox(label="Project ID", value="DEEPWEB") | |
| jobid_tb = gr.Textbox(label="Job ID", value="", interactive=True) | |
| with gr.Row(): | |
| wipe_btn = gr.Button("🧨 Wipe project", variant="stop") | |
| index_btn = gr.Button("🚀 Indexer 2 documents", variant="primary") | |
| count_btn = gr.Button("📊 Count points", variant="secondary") | |
| with gr.Row(): | |
| chunk_size = gr.Slider(64, 1024, value=200, step=8, label="chunk_size") | |
| overlap = gr.Slider(0, 256, value=20, step=2, label="overlap") | |
| batch_size = gr.Slider(1, 128, value=32, step=1, label="batch_size") | |
| store_text = gr.Checkbox(value=True, label="store_text (payload)") | |
| out_log = gr.Textbox(lines=18, label="Logs / Résultats", interactive=False) | |
| with gr.Row(): | |
| status_btn = gr.Button("📡 Status (refresh)") | |
| auto_chk = gr.Checkbox(False, label="⏱️ Auto-refresh status (2 s)") | |
| with gr.Row(): | |
| query_tb = gr.Textbox(label="Query text", value="alpha bravo") | |
| topk = gr.Slider(1, 20, value=5, step=1, label="top_k") | |
| query_btn = gr.Button("🔎 Query") | |
| query_out = gr.Textbox(lines=10, label="Résultats Query", interactive=False) | |
| with gr.Row(): | |
| repo_tb = gr.Textbox(label="Hub dataset repo (ex: user/deepweb_vectors)", value=os.getenv("HF_DATASET_REPO", "")) | |
| export_btn = gr.Button("📤 Export to Hub", variant="secondary") | |
| wipe_btn.click(ui_wipe, inputs=[project_tb], outputs=[out_log]) | |
| index_btn.click(ui_index_sample, inputs=[project_tb, chunk_size, overlap, batch_size, store_text], outputs=[out_log, jobid_tb]) | |
| count_btn.click(ui_count, inputs=[project_tb], outputs=[out_log]) | |
| status_btn.click(ui_status, inputs=[jobid_tb], outputs=[out_log]) | |
| timer = gr.Timer(2.0, active=False) | |
| timer.tick(ui_status, inputs=[jobid_tb], outputs=[out_log]) | |
| auto_chk.change(lambda x: gr.update(active=x), inputs=auto_chk, outputs=timer) | |
| query_btn.click(ui_query, inputs=[project_tb, query_tb, topk], outputs=[query_out]) | |
| export_btn.click(ui_export, inputs=[project_tb, repo_tb], outputs=[out_log]) | |
| # Monte l'UI | |
| app = gr.mount_gradio_app(fastapi_app, ui, path=UI_PATH) | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", "7860")) | |
| LOG.info(f"Démarrage Uvicorn sur 0.0.0.0:{port} (UI_PATH={UI_PATH})") | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |