Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| from __future__ import annotations | |
| import os | |
| import time | |
| import uuid | |
| import random | |
| import logging | |
| import hashlib | |
| import re | |
| import json | |
| from typing import List, Optional, Dict, Any, Tuple | |
| import numpy as np | |
| import requests | |
| from fastapi import FastAPI, BackgroundTasks, Header, HTTPException, Query | |
| from pydantic import BaseModel, Field | |
| # ====================================================================================== | |
| # Logging | |
| # ====================================================================================== | |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s") | |
| LOG = logging.getLogger("remote_indexer") | |
| # ====================================================================================== | |
| # ENV (config) | |
| # ====================================================================================== | |
| # Ordre des backends d'embeddings. Ex: "deepinfra,hf" | |
| EMB_BACKEND_ORDER = [ | |
| s.strip().lower() | |
| for s in os.getenv("EMB_BACKEND_ORDER", os.getenv("EMB_BACKEND", "deepinfra,hf")).split(",") | |
| if s.strip() | |
| ] | |
| # --- DeepInfra Embeddings (OpenAI-like) --- | |
| DI_TOKEN = os.getenv("DEEPINFRA_API_KEY", "").strip() | |
| DI_MODEL = os.getenv("DEEPINFRA_EMBED_MODEL", "BAAI/bge-m3").strip() | |
| DI_URL = os.getenv("DEEPINFRA_EMBED_URL", "https://api.deepinfra.com/v1/openai/embeddings").strip() | |
| DI_TIMEOUT = float(os.getenv("EMB_TIMEOUT_SEC", "120")) | |
| # --- Hugging Face Inference API --- | |
| HF_TOKEN = os.getenv("HF_API_TOKEN", "").strip() | |
| HF_MODEL = os.getenv("HF_EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2").strip() | |
| HF_URL_PIPE = os.getenv("HF_API_URL_PIPELINE", "").strip() or ( | |
| f"https://api-inference.huggingface.co/pipeline/feature-extraction/{HF_MODEL}" | |
| ) | |
| HF_URL_MODL = os.getenv("HF_API_URL_MODELS", "").strip() or ( | |
| f"https://api-inference.huggingface.co/models/{HF_MODEL}" | |
| ) | |
| HF_TIMEOUT = float(os.getenv("EMB_TIMEOUT_SEC", "120")) | |
| HF_WAIT = os.getenv("HF_WAIT_FOR_MODEL", "true").lower() in ("1", "true", "yes", "on") | |
| # --- Retries / backoff --- | |
| RETRY_MAX = int(os.getenv("EMB_RETRY_MAX", "6")) | |
| RETRY_BASE_SEC = float(os.getenv("EMB_RETRY_BASE", "1.6")) | |
| RETRY_JITTER = float(os.getenv("EMB_RETRY_JITTER", "0.35")) | |
| # --- Vector store (Qdrant / Memory fallback) --- | |
| VECTOR_STORE = os.getenv("VECTOR_STORE", "qdrant").strip().lower() | |
| QDRANT_URL = os.getenv("QDRANT_URL", "").strip() | |
| QDRANT_API = os.getenv("QDRANT_API_KEY", "").strip() | |
| # IDs déterministes activés ? | |
| QDRANT_DETERMINISTIC_IDS = os.getenv("QDRANT_DETERMINISTIC_IDS", "true").lower() in ("1","true","yes","on") | |
| QDRANT_ID_MODE = os.getenv("QDRANT_ID_MODE", "uuid").strip().lower() # uuid|int | |
| # Wipe automatique avant chaque /index (optionnel) | |
| WIPE_BEFORE_INDEX = os.getenv("WIPE_BEFORE_INDEX", "false").lower() in ("1","true","yes","on") | |
| # --- Auth d’API de ce service (simple header) --- | |
| AUTH_TOKEN = os.getenv("REMOTE_INDEX_TOKEN", "").strip() | |
| LOG.info(f"Embeddings backend order = {EMB_BACKEND_ORDER}") | |
| LOG.info(f"HF pipeline URL = {HF_URL_PIPE}") | |
| LOG.info(f"HF models URL = {HF_URL_MODL}") | |
| LOG.info(f"VECTOR_STORE = {VECTOR_STORE}") | |
| if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN: | |
| LOG.warning("DEEPINFRA_API_KEY manquant — tentatives DeepInfra échoueront.") | |
| if "hf" in EMB_BACKEND_ORDER and not HF_TOKEN: | |
| LOG.warning("HF_API_TOKEN manquant — tentatives HF échoueront.") | |
| # ====================================================================================== | |
| # Vector Stores (Memory + Qdrant) | |
| # ====================================================================================== | |
| try: | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import VectorParams, Distance, PointStruct | |
| except Exception: | |
| QdrantClient = None | |
| PointStruct = None | |
| class BaseStore: | |
| def ensure_collection(self, name: str, dim: int): ... | |
| def upsert(self, name: str, vectors: np.ndarray, payloads: List[Dict[str, Any]]) -> int: ... | |
| def search(self, name: str, query_vec: np.ndarray, top_k: int) -> List[Dict[str, Any]]: ... | |
| def wipe(self, name: str): ... | |
| class MemoryStore(BaseStore): | |
| """Store en mémoire (volatile) — fallback/tests.""" | |
| def __init__(self): | |
| self.db: Dict[str, Dict[str, Any]] = {} # name -> {"vecs":[np.ndarray], "payloads":[dict], "dim":int} | |
| def ensure_collection(self, name: str, dim: int): | |
| self.db.setdefault(name, {"vecs": [], "payloads": [], "dim": dim}) | |
| def upsert(self, name: str, vectors: np.ndarray, payloads: List[Dict[str, Any]]) -> int: | |
| if name not in self.db: | |
| raise RuntimeError(f"MemoryStore: collection {name} inconnue") | |
| if len(vectors) != len(payloads): | |
| raise ValueError("MemoryStore.upsert: tailles vectors/payloads incohérentes") | |
| self.db[name]["vecs"].extend([np.asarray(v, dtype=np.float32) for v in vectors]) | |
| self.db[name]["payloads"].extend(payloads) | |
| return len(vectors) | |
| def search(self, name: str, query_vec: np.ndarray, top_k: int) -> List[Dict[str, Any]]: | |
| if name not in self.db or not self.db[name]["vecs"]: | |
| return [] | |
| mat = np.vstack(self.db[name]["vecs"]).astype(np.float32) # [N, dim] | |
| q = query_vec.reshape(1, -1).astype(np.float32) | |
| sims = (mat @ q.T).ravel() # cosine (embeddings normalisés en amont) | |
| top_idx = np.argsort(-sims)[:top_k] | |
| out = [] | |
| for i in top_idx: | |
| pl = dict(self.db[name]["payloads"][i]); pl["_score"] = float(sims[i]) | |
| out.append(pl) | |
| return out | |
| def wipe(self, name: str): | |
| self.db.pop(name, None) | |
| def _stable_point_id_uuid(collection: str, payload: Dict[str, Any]) -> str: | |
| """ | |
| UUID v5 déterministe: uuid5(NAMESPACE_URL, 'collection|path|chunk|start|end|BLAKE8(text)') | |
| """ | |
| path = str(payload.get("path", "")) | |
| chunk = str(payload.get("chunk", "")) | |
| start = str(payload.get("start", "")) | |
| end = str(payload.get("end", "")) | |
| text = payload.get("text", "") | |
| # hash court du texte pour stabiliser l’empreinte sans tout concaténer | |
| h = hashlib.blake2b((text or "").encode("utf-8", "ignore"), digest_size=8).hexdigest() | |
| base = f"{collection}|{path}|{chunk}|{start}|{end}|{h}" | |
| return str(uuid.uuid5(uuid.NAMESPACE_URL, base)) | |
| class QdrantStore(BaseStore): | |
| """Store Qdrant — IDs UUID déterministes (par défaut) ou entiers séquentiels.""" | |
| def __init__(self, url: str, api_key: Optional[str] = None, | |
| deterministic_ids: bool = True, id_mode: str = "uuid"): | |
| if QdrantClient is None or PointStruct is None: | |
| raise RuntimeError("qdrant_client non disponible") | |
| self.client = QdrantClient(url=url, api_key=api_key if api_key else None) | |
| self._next_ids: Dict[str, int] = {} | |
| self._deterministic = deterministic_ids | |
| self._id_mode = id_mode if id_mode in ("uuid", "int") else "uuid" | |
| def _init_next_id(self, name: str): | |
| try: | |
| cnt = self.client.count(collection_name=name, exact=True).count | |
| except Exception: | |
| cnt = 0 | |
| self._next_ids[name] = int(cnt) | |
| def ensure_collection(self, name: str, dim: int): | |
| try: | |
| self.client.get_collection(name) | |
| except Exception: | |
| self.client.create_collection( | |
| collection_name=name, | |
| vectors_config=VectorParams(size=dim, distance=Distance.COSINE), | |
| ) | |
| if name not in self._next_ids: | |
| self._init_next_id(name) | |
| def upsert(self, name: str, vectors: np.ndarray, payloads: List[Dict[str, Any]]) -> int: | |
| if vectors is None or len(vectors) == 0: | |
| return 0 | |
| if len(vectors) != len(payloads): | |
| raise ValueError("QdrantStore.upsert: tailles vectors/payloads incohérentes") | |
| points: List[PointStruct] = [] | |
| added = 0 | |
| if self._deterministic and self._id_mode == "uuid": | |
| # UUID déterministes => Qdrant Cloud OK, remplace si existe | |
| seen = set() | |
| for v, pl in zip(vectors, payloads): | |
| pid = _stable_point_id_uuid(name, pl) | |
| if pid in seen: | |
| continue # dédup intra-batch | |
| seen.add(pid) | |
| points.append(PointStruct(id=pid, | |
| vector=np.asarray(v, dtype=np.float32).tolist(), | |
| payload=pl)) | |
| if points: | |
| self.client.upsert(collection_name=name, points=points) | |
| added = len(points) | |
| elif self._deterministic and self._id_mode == "int": | |
| # int déterministes (peu utile si on veut remplacer) | |
| seen = set() | |
| for v, pl in zip(vectors, payloads): | |
| pid_str = _stable_point_id_uuid(name, pl) | |
| pid_int = uuid.UUID(pid_str).int >> 64 | |
| if pid_int in seen: | |
| continue | |
| seen.add(pid_int) | |
| points.append(PointStruct(id=int(pid_int), | |
| vector=np.asarray(v, dtype=np.float32).tolist(), | |
| payload=pl)) | |
| if points: | |
| self.client.upsert(collection_name=name, points=points) | |
| added = len(points) | |
| else: | |
| # IDs séquentiels -> append-only | |
| if name not in self._next_ids: | |
| self._init_next_id(name) | |
| start = self._next_ids[name] | |
| for i, (v, pl) in enumerate(zip(vectors, payloads)): | |
| points.append(PointStruct(id=start + i, | |
| vector=np.asarray(v, dtype=np.float32).tolist(), | |
| payload=pl)) | |
| if points: | |
| self.client.upsert(collection_name=name, points=points) | |
| added = len(points) | |
| self._next_ids[name] += added | |
| LOG.debug(f"QdrantStore.upsert: +{added} points (deterministic={self._deterministic}, mode={self._id_mode})") | |
| return added | |
| def search(self, name: str, query_vec: np.ndarray, top_k: int) -> List[Dict[str, Any]]: | |
| qv = query_vec[0].astype(np.float32).tolist() if query_vec.ndim == 2 else query_vec.astype(np.float32).tolist() | |
| res = self.client.search(collection_name=name, query_vector=qv, limit=int(top_k)) | |
| out = [] | |
| for p in res: | |
| pl = p.payload or {} | |
| try: | |
| pl["_score"] = float(p.score) | |
| except Exception: | |
| pl["_score"] = None | |
| out.append(pl) | |
| return out | |
| def wipe(self, name: str): | |
| try: | |
| self.client.delete_collection(name) | |
| except Exception: | |
| pass | |
| self._next_ids.pop(name, None) | |
| # Initialisation du store actif | |
| try: | |
| if VECTOR_STORE == "qdrant" and QDRANT_URL: | |
| STORE: BaseStore = QdrantStore( | |
| QDRANT_URL, | |
| api_key=QDRANT_API if QDRANT_API else None, | |
| deterministic_ids=QDRANT_DETERMINISTIC_IDS, | |
| id_mode=QDRANT_ID_MODE, | |
| ) | |
| _ = STORE.client.get_collections() # ping | |
| LOG.info("Connecté à Qdrant.") | |
| VECTOR_STORE_ACTIVE = "QdrantStore" | |
| else: | |
| raise RuntimeError("Qdrant non configuré, fallback mémoire.") | |
| except Exception as e: | |
| LOG.error(f"Qdrant indisponible (Connexion Qdrant impossible: {e}) — fallback en mémoire.") | |
| STORE = MemoryStore() | |
| VECTOR_STORE_ACTIVE = "MemoryStore" | |
| LOG.warning("Vector store: MEMORY (fallback). Les données sont volatiles (perdues au restart).") | |
| # ====================================================================================== | |
| # Pydantic I/O | |
| # ====================================================================================== | |
| class FileIn(BaseModel): | |
| path: Optional[str] = "" # tolérancemajeure: accepte None | |
| text: Optional[str] = "" # idem | |
| class IndexRequest(BaseModel): | |
| project_id: str = Field(..., min_length=1) | |
| files: List[FileIn] | |
| chunk_size: int = 1200 | |
| overlap: int = 200 | |
| batch_size: int = 8 | |
| store_text: bool = True | |
| class QueryRequest(BaseModel): | |
| project_id: str | |
| query: str | |
| top_k: int = 6 | |
| class StatusBody(BaseModel): | |
| job_id: str | |
| # ====================================================================================== | |
| # Jobs store (mémoire) | |
| # ====================================================================================== | |
| JOBS: Dict[str, Dict[str, Any]] = {} # {job_id: {"status": "...", "logs": [...], "created": ts}} | |
| def _append_log(job_id: str, line: str): | |
| job = JOBS.get(job_id) | |
| if job: | |
| job["logs"].append(line) | |
| def _set_status(job_id: str, status: str): | |
| job = JOBS.get(job_id) | |
| if job: | |
| job["status"] = status | |
| def _auth(x_auth: Optional[str]): | |
| if AUTH_TOKEN and (x_auth or "") != AUTH_TOKEN: | |
| raise HTTPException(401, "Unauthorized") | |
| # ====================================================================================== | |
| # Embeddings backends + retry/fallback | |
| # ====================================================================================== | |
| def _retry_sleep(attempt: int) -> float: | |
| back = (RETRY_BASE_SEC ** attempt) | |
| jitter = 1.0 + random.uniform(-RETRY_JITTER, RETRY_JITTER) | |
| return max(0.25, back * jitter) | |
| def _normalize_rows(arr: np.ndarray) -> np.ndarray: | |
| arr = np.asarray(arr, dtype=np.float32) | |
| norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12 | |
| return (arr / norms).astype(np.float32) | |
| def _di_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]: | |
| if not DI_TOKEN: | |
| raise RuntimeError("DEEPINFRA_API_KEY manquant (backend=deepinfra).") | |
| headers = {"Authorization": f"Bearer {DI_TOKEN}", "Content-Type": "application/json"} | |
| payload = {"model": DI_MODEL, "input": batch} | |
| r = requests.post(DI_URL, headers=headers, json=payload, timeout=DI_TIMEOUT) | |
| size = int(r.headers.get("Content-Length", "0") or 0) | |
| if r.status_code >= 400: | |
| LOG.error(f"DeepInfra error {r.status_code}: {r.text[:1000]}") | |
| r.raise_for_status() | |
| js = r.json() | |
| data = js.get("data") | |
| if not isinstance(data, list) or not data: | |
| raise RuntimeError(f"DeepInfra embeddings: réponse invalide {js}") | |
| embs = [d.get("embedding") for d in data] | |
| arr = np.asarray(embs, dtype=np.float32) | |
| if arr.ndim != 2: | |
| raise RuntimeError(f"DeepInfra: unexpected embeddings shape: {arr.shape}") | |
| return _normalize_rows(arr), size | |
| def _hf_post_embeddings_once(batch: List[str]) -> Tuple[np.ndarray, int]: | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_API_TOKEN manquant (backend=hf).") | |
| headers = { | |
| "Authorization": f"Bearer {HF_TOKEN}", | |
| "Content-Type": "application/json", | |
| } | |
| if HF_WAIT: | |
| headers["X-Wait-For-Model"] = "true" | |
| headers["X-Use-Cache"] = "true" | |
| def _call(url: str, payload: Dict[str, Any], extra_headers: Optional[Dict[str, str]] = None): | |
| h = dict(headers) | |
| if extra_headers: | |
| h.update(extra_headers) | |
| r = requests.post(url, headers=h, json=payload, timeout=HF_TIMEOUT) | |
| return r | |
| payload = {"inputs": batch if len(batch) > 1 else batch[0]} | |
| r = _call(HF_URL_PIPE, payload) | |
| size = int(r.headers.get("Content-Length", "0") or 0) | |
| if r.status_code == 404: | |
| LOG.error("HF error 404: Not Found") | |
| LOG.warning(f"HF endpoint {HF_URL_PIPE} non dispo (404), fallback vers alternative ...") | |
| elif r.status_code >= 400: | |
| LOG.error(f"HF error {r.status_code}: {r.text[:1000]}") | |
| r.raise_for_status() | |
| else: | |
| data = r.json() | |
| arr = np.array(data, dtype=np.float32) | |
| if arr.ndim == 3: | |
| arr = arr.mean(axis=1) | |
| if arr.ndim == 1: | |
| arr = arr.reshape(1, -1) | |
| if arr.ndim != 2: | |
| raise RuntimeError(f"HF: unexpected embeddings shape: {arr.shape}") | |
| return _normalize_rows(arr), size | |
| r2 = _call(HF_URL_MODL, payload) | |
| size2 = int(r2.headers.get("Content-Length", "0") or 0) | |
| if r2.status_code >= 400: | |
| LOG.error(f"HF error {r2.status_code}: {r2.text[:1000]}") | |
| if r2.status_code == 400 and "SentenceSimilarityPipeline" in (r2.text or ""): | |
| LOG.warning("HF MODELS a choisi Similarity -> retry avec ?task=feature-extraction + X-Task") | |
| r3 = _call( | |
| HF_URL_MODL + "?task=feature-extraction", | |
| payload, | |
| extra_headers={"X-Task": "feature-extraction"} | |
| ) | |
| size3 = int(r3.headers.get("Content-Length", "0") or 0) | |
| if r3.status_code >= 400: | |
| LOG.error(f"HF error {r3.status_code}: {r3.text[:1000]}") | |
| r3.raise_for_status() | |
| data3 = r3.json() | |
| arr3 = np.array(data3, dtype=np.float32) | |
| if arr3.ndim == 3: | |
| arr3 = arr3.mean(axis=1) | |
| if arr3.ndim == 1: | |
| arr3 = arr3.reshape(1, -1) | |
| if arr3.ndim != 2: | |
| raise RuntimeError(f"HF: unexpected embeddings shape: {arr3.shape}") | |
| return _normalize_rows(arr3), size3 | |
| else: | |
| r2.raise_for_status() | |
| data2 = r2.json() | |
| arr2 = np.array(data2, dtype=np.float32) | |
| if arr2.ndim == 3: | |
| arr2 = arr2.mean(axis=1) | |
| if arr2.ndim == 1: | |
| arr2 = arr2.reshape(1, -1) | |
| if arr2.ndim != 2: | |
| raise RuntimeError(f"HF: unexpected embeddings shape: {arr2.shape}") | |
| return _normalize_rows(arr2), size2 | |
| def _call_with_retries(func, batch: List[str], label: str, job_id: Optional[str] = None) -> Tuple[np.ndarray, int]: | |
| last_exc = None | |
| for attempt in range(RETRY_MAX): | |
| try: | |
| if job_id: | |
| _append_log(job_id, f"{label}: try {attempt+1}/{RETRY_MAX} (batch={len(batch)})") | |
| return func(batch) | |
| except requests.HTTPError as he: | |
| code = he.response.status_code if he.response is not None else "HTTP" | |
| retriable = code in (429, 500, 502, 503, 504) | |
| if not retriable: | |
| raise | |
| sleep_s = _retry_sleep(attempt) | |
| msg = f"{label}: HTTP {code}, retry in {sleep_s:.1f}s" | |
| LOG.warning(msg); _append_log(job_id, msg) | |
| time.sleep(sleep_s) | |
| last_exc = he | |
| except Exception as e: | |
| sleep_s = _retry_sleep(attempt) | |
| msg = f"{label}: error {type(e).__name__}: {e}, retry in {sleep_s:.1f}s" | |
| LOG.warning(msg); _append_log(job_id, msg) | |
| time.sleep(sleep_s) | |
| last_exc = e | |
| raise RuntimeError(f"{label}: retries exhausted: {last_exc}") | |
| def _post_embeddings(batch: List[str], job_id: Optional[str] = None) -> Tuple[np.ndarray, int]: | |
| last_err = None | |
| for b in EMB_BACKEND_ORDER: | |
| if b == "deepinfra": | |
| try: | |
| return _call_with_retries(_di_post_embeddings_once, batch, "DeepInfra", job_id) | |
| except Exception as e: | |
| last_err = e; _append_log(job_id, f"DeepInfra failed: {e}."); LOG.error(f"DeepInfra failed: {e}") | |
| elif b == "hf": | |
| try: | |
| return _call_with_retries(_hf_post_embeddings_once, batch, "HF", job_id) | |
| except Exception as e: | |
| last_err = e; _append_log(job_id, f"HF failed: {e}."); LOG.error(f"HF failed: {e}") | |
| if "SentenceSimilarityPipeline" in str(e) and "deepinfra" not in EMB_BACKEND_ORDER: | |
| _append_log(job_id, "Auto-fallback DeepInfra (HF => SentenceSimilarity).") | |
| try: | |
| return _call_with_retries(_di_post_embeddings_once, batch, "DeepInfra", job_id) | |
| except Exception as e2: | |
| last_err = e2; _append_log(job_id, f"DeepInfra failed after HF: {e2}."); LOG.error(f"DeepInfra failed after HF: {e2}") | |
| else: | |
| _append_log(job_id, f"Backend inconnu ignoré: {b}") | |
| raise RuntimeError(f"Tous les backends ont échoué: {last_err}") | |
| # ====================================================================================== | |
| # Helpers chunking | |
| # ====================================================================================== | |
| def _chunk_with_spans(text: str, size: int, overlap: int): | |
| n = len(text or "") | |
| if size <= 0: | |
| yield (0, n, text); return | |
| i = 0 | |
| while i < n: | |
| j = min(n, i + size) | |
| yield (i, j, text[i:j]) | |
| i = max(0, j - overlap) | |
| if i >= n: | |
| break | |
| def _clean_chunk_text(text: str) -> str: | |
| """ | |
| Nettoyage simple des fragments JSON / artefacts: | |
| - supprime un champ "indexed_at" tronqué à la fin, | |
| - supprime accolades/caractères isolés en début/fin, | |
| - compacte sauts de ligne multiples, | |
| - tente d'extraire des valeurs textuelles si le chunk ressemble fortement à du JSON. | |
| """ | |
| if not text: | |
| return text | |
| t = (text or "").strip() | |
| # retirer un suffixe typique: , "indexed_at": "2025-..."}} | |
| t = re.sub(r',\s*"indexed_at"\s*:\s*"[^"]*"\s*}+\s*$', '', t, flags=re.IGNORECASE) | |
| # retirer d'autres clés timestamps communes à la fin si tronquées | |
| t = re.sub(r',\s*"(created_at|timestamp|time|date)"\s*:\s*"[^"]*"\s*}+\s*$', '', t, flags=re.IGNORECASE) | |
| # retirer accolades ou crochets seuls en début/fin | |
| t = re.sub(r'^[\s\]\}\,]+', '', t) | |
| t = re.sub(r'[\s\]\}\,]+$', '', t) | |
| # si le chunk ressemble majoritairement à du JSON (beaucoup de ":" ou "{"), essayer d'en extraire les valeurs textuelles | |
| if t.count(':') >= 3 and (t.count('{') + t.count('}')) >= 1: | |
| try: | |
| j = json.loads(t) | |
| if isinstance(j, dict): | |
| # concatène les valeurs textuelles pertinentes | |
| vals = [] | |
| for v in j.values(): | |
| if isinstance(v, (str, int, float)): | |
| vals.append(str(v)) | |
| if vals: | |
| t = " ".join(vals) | |
| except Exception: | |
| # ignore, on garde t tel quel | |
| pass | |
| # compacter sauts de ligne | |
| t = re.sub(r'\n{3,}', '\n\n', t) | |
| return t.strip() | |
| # ====================================================================================== | |
| # Background task : indexation — VERSION CORRIGÉE (ajouts anti-dup & robustesse) | |
| # ====================================================================================== | |
| def run_index_job(job_id: str, req: IndexRequest): | |
| try: | |
| _set_status(job_id, "running") | |
| _append_log(job_id, f"Start project={req.project_id} files={len(req.files)} | backends={EMB_BACKEND_ORDER} | store={VECTOR_STORE} (deterministic_ids={QDRANT_DETERMINISTIC_IDS}, mode={QDRANT_ID_MODE})") | |
| LOG.info(f"[{job_id}] Index start project={req.project_id} files={len(req.files)}") | |
| # ensemble de hashes de chunks déjà vus dans ce job (dédup intra-job) | |
| seen_chunk_hashes = set() | |
| # --- DEBUG DIAGNOSTIC (INSÈRE ICI) --- | |
| try: | |
| N_SAMPLE = 6 | |
| sample = req.files[:N_SAMPLE] | |
| seen_hashes = {} | |
| for fidx, fi in enumerate(sample, 1): | |
| p = (getattr(fi, "path", "") or "") or "" | |
| t = (getattr(fi, "text", "") or "") or "" | |
| h = hashlib.blake2b((t or "").encode("utf-8", "ignore"), digest_size=8).hexdigest() | |
| seen_hashes.setdefault(h, []).append(p) | |
| LOG.info(f"[{job_id}] recv file #{fidx}: path={p!r} len_text={len(t)} hash8={h} preview={repr(t[:120])}") | |
| if len(req.files) > N_SAMPLE: | |
| LOG.info(f"[{job_id}] ... and {len(req.files)-N_SAMPLE} more files") | |
| if len(seen_hashes) == 1 and len(req.files) > 1: | |
| _append_log(job_id, "⚠️ All received files appear IDENTICAL (same hash). Possible client-side bug.") | |
| LOG.warning("[%s] All files identical by hash8=%s", job_id, list(seen_hashes.keys())[0]) | |
| except Exception as _e: | |
| LOG.exception("Debug sample failed: %s", _e) | |
| # --- end debug block --- | |
| col = f"proj_{req.project_id}" | |
| # Option: wipe avant index | |
| if WIPE_BEFORE_INDEX: | |
| try: | |
| STORE.wipe(col) | |
| _append_log(job_id, f"Wiped existing collection: {col}") | |
| except Exception as e: | |
| _append_log(job_id, f"Wipe failed (ignored): {e}") | |
| # --- WARMUP: calculer un embedding de test pour déterminer la dimension (dim) --- | |
| # On prend un chunk de départ (ou une string 'warmup' si pas de fichiers) | |
| if req.files: | |
| warm_text = next(_chunk_with_spans((req.files[0].text or "") , req.chunk_size, req.overlap))[2] | |
| else: | |
| warm_text = "warmup" | |
| try: | |
| embs, sz = _post_embeddings([warm_text], job_id=job_id) | |
| if embs is None or embs.ndim != 2: | |
| raise RuntimeError("Warmup embeddings invalid shape") | |
| dim = int(embs.shape[1]) | |
| LOG.info(f"[{job_id}] warmup embeddings shape = {embs.shape} dtype={embs.dtype}") | |
| _append_log(job_id, f"warmup embeddings shape = {embs.shape} dim={dim}") | |
| except Exception as e: | |
| LOG.exception("[%s] Warmup embeddings failed: %s", job_id, e) | |
| _append_log(job_id, f"Warmup embeddings failed: {e}") | |
| _set_status(job_id, "error") | |
| return | |
| # If using QdrantStore: check existing collection vector size and warn if mismatch | |
| if isinstance(STORE, QdrantStore): | |
| try: | |
| # client.get_collection throws if not exists | |
| info = STORE.client.get_collection(collection_name=col) | |
| existing_size = None | |
| # depending on qdrant client version, structure might be different: | |
| if hasattr(info, "result") and isinstance(info.result, dict): | |
| cfg = info.result.get("params") or {} | |
| vectors = cfg.get("vectors") or {} | |
| existing_size = int(vectors.get("size")) if vectors.get("size") else None | |
| elif isinstance(info, dict): | |
| cfg = info.get("result", info) | |
| vectors = cfg.get("params", {}).get("vectors", {}) | |
| existing_size = int(vectors.get("size")) if vectors else None | |
| if existing_size and existing_size != dim: | |
| msg = (f"Qdrant collection {col} already exists with dim={existing_size} but embeddings dim={dim}. " | |
| "This will likely cause vectors to be rejected. Consider wiping or recreating collection.") | |
| LOG.error("[%s] %s", job_id, msg) | |
| _append_log(job_id, msg) | |
| # Optional: if WIPE_BEFORE_INDEX True, recreate: | |
| if WIPE_BEFORE_INDEX: | |
| try: | |
| STORE.wipe(col) | |
| STORE.ensure_collection(col, dim) | |
| _append_log(job_id, f"Recreated collection {col} with dim={dim} (WIPE_BEFORE_INDEX).") | |
| except Exception as e: | |
| _append_log(job_id, f"Failed recreate collection: {e}") | |
| except Exception as e: | |
| # collection not present or unable to introspect -> ok, ensure_collection will create | |
| LOG.debug("[%s] Could not introspect collection: %s", job_id, e) | |
| STORE.ensure_collection(col, dim) | |
| _append_log(job_id, f"Collection ready: {col} (dim={dim})") | |
| total_chunks = 0 | |
| buf_chunks: List[str] = [] | |
| buf_metas: List[Dict[str, Any]] = [] | |
| def _flush(): | |
| nonlocal buf_chunks, buf_metas, total_chunks | |
| if not buf_chunks: | |
| return | |
| # ✅ DÉDUP INTRA-BATCH (même texte → même ID) | |
| if QDRANT_DETERMINISTIC_IDS: | |
| before = len(buf_metas) | |
| seen = set() | |
| dedup_chunks, dedup_metas = [], [] | |
| for txt, meta in zip(buf_chunks, buf_metas): | |
| pid = _stable_point_id_uuid(col, meta) if QDRANT_ID_MODE == "uuid" else uuid.UUID(_stable_point_id_uuid(col, meta)).int >> 64 | |
| if pid in seen: | |
| continue | |
| seen.add(pid) | |
| dedup_chunks.append(txt); dedup_metas.append(meta) | |
| buf_chunks, buf_metas = dedup_chunks, dedup_metas | |
| skipped = before - len(buf_metas) | |
| if skipped > 0: | |
| _append_log(job_id, f"Dedup intra-batch: skipped {skipped} duplicates") | |
| try: | |
| vecs, sz = _post_embeddings(buf_chunks, job_id=job_id) | |
| except Exception as e: | |
| # échec -> journaliser et faire échouer le job proprement (on ne vide pas le buffer pour debug mais on arrête) | |
| LOG.exception("[%s] Embeddings failed during flush: %s", job_id, e) | |
| _append_log(job_id, f"Embeddings failed during flush: {e}") | |
| _set_status(job_id, "error") | |
| raise | |
| added = STORE.upsert(col, vecs, buf_metas) | |
| total_chunks += added | |
| _append_log(job_id, f"+{added} chunks (total={total_chunks}) ~{(sz/1024.0):.1f}KiB") | |
| # vider buffers ONLY après succès | |
| buf_chunks, buf_metas = [], [] | |
| # ✅ Filtre des fichiers pertinents | |
| TEXT_EXTS = {".py", ".md", ".txt", ".yaml", ".yml", ".json", ".sh", ".dockerfile", ".ini", ".cfg", ".toml", ".env"} | |
| IGNORE_PREFIXES = {".git", "__pycache__", ".vscode", ".idea", "node_modules", "build", "dist", "venv", ".env", ".log", ".tmp"} | |
| for fi, f in enumerate(req.files, 1): | |
| # defensive: path/text peuvent être None -> utiliser fallback | |
| path_raw = (getattr(f, "path", "") or "") # peut être None | |
| path = (path_raw or "").strip() | |
| text_raw = (getattr(f, "text", "") or "") | |
| text = text_raw or "" | |
| if not path: | |
| # fallback path stable basé sur hash du texte (pour éviter collisions None) | |
| h8 = hashlib.blake2b((text or "").encode("utf-8", "ignore"), digest_size=8).hexdigest() | |
| path = f"__no_path__{h8}" | |
| if any(path.startswith(p) for p in IGNORE_PREFIXES): | |
| _append_log(job_id, f"📁 Ignored: {path} (dossier ignoré)") | |
| continue | |
| ext = os.path.splitext(path)[1].lower() | |
| if ext not in TEXT_EXTS: | |
| _append_log(job_id, f"📁 Ignored: {path} (extension non supportée: {ext})") | |
| continue | |
| if len((text or "").strip()) < 50: # ✅ Ignorer les fichiers trop courts | |
| _append_log(job_id, f"📄 Ignored: {path} (texte trop court: {len((text or '').strip())} chars)") | |
| continue | |
| _append_log(job_id, f"📄 Processing: {path} ({len(text)} chars)") | |
| # --- traitement spécial JSON / NDJSON --- | |
| if ext in {".json"} or path.lower().endswith(".ndjson"): | |
| handled = False | |
| try: | |
| parsed = json.loads(text) | |
| # si c'est une liste -> indexer chaque entrée séparément | |
| if isinstance(parsed, list): | |
| for idx, obj in enumerate(parsed): | |
| if isinstance(obj, dict): | |
| s = " ".join(str(v) for v in obj.values() if isinstance(v, (str, int, float))) | |
| else: | |
| s = str(obj) | |
| s = _clean_chunk_text(s) | |
| if len(s) < 30: | |
| continue | |
| # dedup global intra-job | |
| chash = hashlib.blake2b(s.encode("utf-8", "ignore"), digest_size=8).hexdigest() | |
| if chash in seen_chunk_hashes: | |
| continue | |
| seen_chunk_hashes.add(chash) | |
| meta = {"path": path, "chunk": idx, "start": 0, "end": len(s)} | |
| if req.store_text: | |
| meta["text"] = s | |
| buf_chunks.append(s); buf_metas.append(meta) | |
| if len(buf_chunks) >= req.batch_size: | |
| _flush() | |
| handled = True | |
| elif isinstance(parsed, dict): | |
| s = " ".join(str(v) for v in parsed.values() if isinstance(v, (str, int, float))) | |
| s = _clean_chunk_text(s) | |
| if len(s) >= 30: | |
| chash = hashlib.blake2b(s.encode("utf-8", "ignore"), digest_size=8).hexdigest() | |
| if chash not in seen_chunk_hashes: | |
| seen_chunk_hashes.add(chash) | |
| meta = {"path": path, "chunk": 0, "start": 0, "end": len(s)} | |
| if req.store_text: | |
| meta["text"] = s | |
| buf_chunks.append(s); buf_metas.append(meta) | |
| if len(buf_chunks) >= req.batch_size: | |
| _flush() | |
| handled = True | |
| except Exception: | |
| # fallback NDJSON: une ligne == un JSON ou texte | |
| try: | |
| lines = [L for L in (text or "").splitlines() if L.strip()] | |
| for li, line in enumerate(lines): | |
| try: | |
| obj = json.loads(line) | |
| if isinstance(obj, dict): | |
| s = " ".join(str(v) for v in obj.values() if isinstance(v, (str, int, float))) | |
| else: | |
| s = str(obj) | |
| s = _clean_chunk_text(s) | |
| if len(s) < 30: | |
| continue | |
| chash = hashlib.blake2b(s.encode("utf-8", "ignore"), digest_size=8).hexdigest() | |
| if chash in seen_chunk_hashes: | |
| continue | |
| seen_chunk_hashes.add(chash) | |
| meta = {"path": path, "chunk": li, "start": 0, "end": len(s)} | |
| if req.store_text: | |
| meta["text"] = s | |
| buf_chunks.append(s); buf_metas.append(meta) | |
| if len(buf_chunks) >= req.batch_size: | |
| _flush() | |
| except Exception: | |
| # ligne non JSON -> indexer comme texte si longue | |
| sl = (line or "").strip() | |
| if len(sl) >= 30: | |
| sl = _clean_chunk_text(sl) | |
| chash = hashlib.blake2b(sl.encode("utf-8", "ignore"), digest_size=8).hexdigest() | |
| if chash in seen_chunk_hashes: | |
| continue | |
| seen_chunk_hashes.add(chash) | |
| meta = {"path": path, "chunk": li, "start": 0, "end": len(sl)} | |
| if req.store_text: | |
| meta["text"] = sl | |
| buf_chunks.append(sl); buf_metas.append(meta) | |
| if len(buf_chunks) >= req.batch_size: | |
| _flush() | |
| handled = True | |
| except Exception: | |
| handled = False | |
| if handled: | |
| _flush() | |
| _append_log(job_id, f"File done: {path}") | |
| continue # passe au fichier suivant | |
| # --- traitement normal pour fichiers texte --- | |
| for ci, (start, end, chunk_txt) in enumerate(_chunk_with_spans(text or "", req.chunk_size, req.overlap)): | |
| chunk_txt = (chunk_txt or "").strip() | |
| if len(chunk_txt) < 30: # ✅ Ignorer les chunks trop courts | |
| continue | |
| # nettoyage pour éviter artefacts JSON / timestamps | |
| chunk_txt = _clean_chunk_text(chunk_txt) | |
| if len(chunk_txt) < 30: | |
| continue | |
| # dedup global intra-job (empêche répétitions) | |
| chash = hashlib.blake2b(chunk_txt.encode("utf-8", "ignore"), digest_size=8).hexdigest() | |
| if chash in seen_chunk_hashes: | |
| continue | |
| seen_chunk_hashes.add(chash) | |
| buf_chunks.append(chunk_txt) | |
| meta = { | |
| "path": path, | |
| "chunk": ci, | |
| "start": start, | |
| "end": end, | |
| } | |
| if req.store_text: | |
| meta["text"] = chunk_txt | |
| buf_metas.append(meta) | |
| if len(buf_chunks) >= req.batch_size: | |
| _flush() | |
| # flush fin de fichier | |
| _flush() | |
| _append_log(job_id, f"File done: {path}") | |
| _append_log(job_id, f"Done. chunks={total_chunks}") | |
| _set_status(job_id, "done") | |
| LOG.info(f"[{job_id}] Index finished. chunks={total_chunks}") | |
| except Exception as e: | |
| LOG.exception("Index job failed") | |
| _append_log(job_id, f"ERROR: {e}") | |
| _set_status(job_id, "error") | |
| # ====================================================================================== | |
| # API | |
| # ====================================================================================== | |
| app = FastAPI() | |
| def root(): | |
| return { | |
| "ok": True, | |
| "service": "remote-indexer", | |
| "backends": EMB_BACKEND_ORDER, | |
| "hf_url_pipeline": HF_URL_PIPE if "hf" in EMB_BACKEND_ORDER else None, | |
| "hf_url_models": HF_URL_MODL if "hf" in EMB_BACKEND_ORDER else None, | |
| "di_url": DI_URL if "deepinfra" in EMB_BACKEND_ORDER else None, | |
| "di_model": DI_MODEL if "deepinfra" in EMB_BACKEND_ORDER else None, | |
| "vector_store": VECTOR_STORE, | |
| "vector_store_active": "QdrantStore" if isinstance(STORE, QdrantStore) else "MemoryStore", | |
| "qdrant_deterministic_ids": QDRANT_DETERMINISTIC_IDS, | |
| "qdrant_id_mode": QDRANT_ID_MODE, | |
| "wipe_before_index": WIPE_BEFORE_INDEX, | |
| "docs": "/health, /index, /status/{job_id} | /status?job_id= | POST /status, /query, /wipe", | |
| } | |
| def health(): | |
| return {"ok": True, "store": "QdrantStore" if isinstance(STORE, QdrantStore) else "MemoryStore"} | |
| def _check_backend_ready(): | |
| if "hf" in EMB_BACKEND_ORDER and not HF_TOKEN: | |
| raise HTTPException(400, "HF_API_TOKEN manquant côté serveur (backend=hf).") | |
| if "deepinfra" in EMB_BACKEND_ORDER and not DI_TOKEN and EMB_BACKEND_ORDER == ["deepinfra"]: | |
| raise HTTPException(400, "DEEPINFRA_API_KEY manquant côté serveur (backend=deepinfra).") | |
| def start_index(req: IndexRequest, background_tasks: BackgroundTasks, x_auth_token: Optional[str] = Header(default=None)): | |
| _auth(x_auth_token) | |
| _check_backend_ready() | |
| job_id = uuid.uuid4().hex[:12] | |
| JOBS[job_id] = {"status": "queued", "logs": [], "created": time.time()} | |
| LOG.info(f"Created job {job_id} for project {req.project_id}") | |
| _append_log(job_id, f"Job created: {job_id} project={req.project_id}") | |
| background_tasks.add_task(run_index_job, job_id, req) | |
| return {"job_id": job_id} | |
| # --- 3 variantes pour /status --- | |
| def status_path(job_id: str, x_auth_token: Optional[str] = Header(default=None)): | |
| _auth(x_auth_token) | |
| j = JOBS.get(job_id) | |
| if not j: | |
| # Response JSON plus explicite pour faciliter le debug côté client | |
| raise HTTPException(status_code=404, detail={"error": "job inconnu", "advice": "POST /index to create a new job"}) | |
| return {"status": j["status"], "logs": j["logs"][-1500:]} | |
| def status_query(job_id: str = Query(...), x_auth_token: Optional[str] = Header(default=None)): | |
| return status_path(job_id, x_auth_token) | |
| def status_post(body: StatusBody, x_auth_token: Optional[str] = Header(default=None)): | |
| return status_path(body.job_id, x_auth_token) | |
| def query(req: QueryRequest, x_auth_token: Optional[str] = Header(default=None)): | |
| _auth(x_auth_token) | |
| _check_backend_ready() | |
| vecs, _ = _post_embeddings([req.query]) | |
| col = f"proj_{req.project_id}" | |
| try: | |
| results = STORE.search(col, vecs[0], int(req.top_k)) | |
| except Exception as e: | |
| raise HTTPException(400, f"Search failed: {e}") | |
| out = [] | |
| for pl in results: | |
| txt = pl.get("text") | |
| if txt and len(txt) > 800: | |
| txt = txt[:800] + "..." | |
| out.append({ | |
| "path": pl.get("path"), | |
| "chunk": pl.get("chunk"), | |
| "start": pl.get("start"), | |
| "end": pl.get("end"), | |
| "text": txt, | |
| "score": float(pl.get("_score")) if pl.get("_score") is not None else None | |
| }) | |
| return {"results": out} | |
| def wipe_collection(project_id: str, x_auth_token: Optional[str] = Header(default=None)): | |
| _auth(x_auth_token) | |
| col = f"proj_{project_id}" | |
| try: | |
| STORE.wipe(col); return {"ok": True} | |
| except Exception as e: | |
| raise HTTPException(400, f"wipe failed: {e}") | |
| # ====================================================================================== | |
| # Entrypoint | |
| # ====================================================================================== | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", "7860")) | |
| LOG.info(f"===== Application Startup on PORT {port} =====") | |
| uvicorn.run(app, host="0.0.0.0", port=port) |