from __future__ import annotations import json, re, unicodedata, ast, os from typing import List, Dict, Any, Optional import requests from smolagents import Tool, CodeAgent, InferenceClientModel from sentence_transformers import SentenceTransformer, util # --- Config runtime via env (avec valeurs par défaut sûres sur Space) --- HF_TIMEOUT = int(os.getenv("HF_TIMEOUT", "180")) # 180s au lieu de 60s HF_MAX_TOKENS = int(os.getenv("HF_MAX_TOKENS", "384")) # réduire un peu la génération AGENT_MAX_STEPS = int(os.getenv("AGENT_MAX_STEPS", "6")) # Ordre: un modèle préféré, puis 2 replis rapides et dispo publique FALLBACK_MODELS = [ os.getenv("HF_MODEL_ID") or "meta-llama/Meta-Llama-3.1-8B-Instruct", "Qwen/Qwen2.5-7B-Instruct", "HuggingFaceH4/zephyr-7b-beta", ] # ---- Mini référentiel COICOP (démo) ---- # ---- Mini référentiel COICOP (démo) ---- COICOP_ITEMS = [ {"code": "01.1.4.5.1", "label": "Laits caillés, fromage blanc, petites crèmes fromagères"}, {"code": "01.1.4.5.2", "label": "Fromage à pâte molle et à pâte persillée"}, {"code": "01.1.4.5.3", "label": "Fromage à pâte pressée"}, {"code": "01.1.4.5.4", "label": "Fromage de chèvre"}, {"code": "01.1.4.5.5", "label": "Fromages fondus, râpés, portions"}, {"code": "01.1.1.4", "label": "Pain"}, {"code": "01.1.1.1", "label": "Riz"}, {"code": "01.1.1.3", "label": "Pâtes, couscous et produits similaires"}, ] # ✅ Map code -> libellé (avec un libellé pour le code générique) CODE_TO_LABEL = {it["code"]: it["label"] for it in COICOP_ITEMS} CODE_TO_LABEL.setdefault("01.1.4.5", "Fromages (générique)") def normalize_txt(s: str) -> str: if not s: return "" s = s.upper() s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") s = re.sub(r"[^A-Z0-9% ]+", " ", s) s = re.sub(r"\s+", " ", s).strip() return s def ean_check_digit_ok(ean: str) -> bool: digits = re.sub(r"\D", "", ean) if len(digits) not in (8, 12, 13, 14): return False total = 0 for i, ch in enumerate(reversed(digits[:-1]), start=1): n = int(ch); total += n * (3 if i % 2 == 1 else 1) check = (10 - (total % 10)) % 10 return check == int(digits[-1]) # ---- ValidateEANTool ---- class ValidateEANTool(Tool): name, description = "validate_ean", "Valide un EAN/GTIN (clé GS1)." inputs = {"ean": {"type": "string", "description": "Code EAN/GTIN (8/12/13/14 chiffres)."}} output_type = "object" def forward(self, ean: str): digits = re.sub(r"\D", "", ean or "") if len(digits) not in (8, 12, 13, 14): return {"valid": False, "normalized": digits} total = 0 for i, ch in enumerate(reversed(digits[:-1]), start=1): n = int(ch); total += n * (3 if i % 2 == 1 else 1) check = (10 - (total % 10)) % 10 return {"valid": check == int(digits[-1]), "normalized": digits} # ---- OFFByEAN ---- class OFFByEAN(Tool): name = "openfoodfacts_product_by_ean" description = "Open Food Facts /api/v0|v2/product/{ean} (name, brands, categories...)." inputs = {"ean": {"type": "string", "description": "EAN à interroger sur l'API OFF."}} output_type = "object" requirements = ["requests"] def forward(self, ean: str): import re, json from requests.adapters import HTTPAdapter try: from urllib3.util.retry import Retry except Exception: Retry = None def _to_list(x): if x is None: return [] if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()] if isinstance(x, str): return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()] return [str(x).strip()] def _first(*vals): for v in vals: if isinstance(v, str) and v.strip(): return v.strip() return "" code = re.sub(r"\D", "", ean or "") if not code: return {"ok": False, "status": 0, "code": "", "error": "EAN vide"} sess = requests.Session() sess.headers.update({"User-Agent":"insee-coicop-agent/1.0","Accept":"application/json"}) if Retry: retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[429,500,502,503,504], allowed_methods=frozenset(["GET"]), raise_on_status=False) sess.mount("https://", HTTPAdapter(max_retries=retry)) urls = [ f"https://world.openfoodfacts.org/api/v0/product/{code}.json", "https://world.openfoodfacts.org/api/v2/product/" f"{code}?lc=fr&fields=code,product_name,product_name_fr,brands,labels_tags," "categories_tags,categories_tags_fr,categories_hierarchy,ingredients,ingredients_text," "ingredients_text_fr,ingredients_text_en,allergens,allergens_tags,traces,traces_tags," "stores,status,status_verbose", f"https://world.openfoodfacts.net/api/v0/product/{code}.json", ] last_err = None for u in urls: try: r = sess.get(u, timeout=15) if not r.ok: last_err = f"HTTP {r.status_code}" continue data = r.json() product = data.get("product") status = data.get("status", 1 if product else 0) if status == 1 or product: p = product or {} product_name = _first(p.get("product_name_fr"), p.get("product_name")) categories_tags = p.get("categories_tags_fr") or p.get("categories_tags") or p.get("categories") categories_tags = _to_list(categories_tags) categories_hierarchy = _to_list(p.get("categories_hierarchy")) # Ingrédients : texte + liste structurée ingredients_text = _first(p.get("ingredients_text_fr"), p.get("ingredients_text_en"), p.get("ingredients_text")) ingredients_list = [] if isinstance(p.get("ingredients"), list): for it in p["ingredients"]: txt = it.get("text") or it.get("id") or "" if txt: ingredients_list.append(str(txt).strip()) allergens = _first(p.get("allergens"), None) allergens_tags = _to_list(p.get("allergens_tags")) traces = _first(p.get("traces"), None) # ex: "lait, noisettes" traces_tags = _to_list(p.get("traces_tags")) labels_tags = _to_list(p.get("labels_tags")) brands = _first(p.get("brands"), None) stores = _first(p.get("stores"), None) return { "ok": True, "status": status, "status_verbose": data.get("status_verbose"), "code": code, "used_url": u, "product_name": product_name, "categories_tags": categories_tags, "categories_hierarchy": categories_hierarchy, "ingredients_text": ingredients_text, "ingredients_list": ingredients_list, "allergens": allergens, "allergens_tags": allergens_tags, "traces": traces, "traces_tags": traces_tags, "labels_tags": labels_tags, "brands": brands, "brands_list": _to_list(brands), "stores": stores, "stores_list": _to_list(stores), # Entrées déjà prêtes pour l’étape 3 "step3_inputs": { "product_name": product_name, "categories_tags": categories_tags, "ingredients_text": ingredients_text, "ingredients_list": ingredients_list, "traces": traces, "traces_tags": traces_tags, }, } except Exception as e: last_err = str(e) return {"ok": False, "status": 0, "code": code, "error": last_err or "not found"} # ---- RegexCOICOP ---- class RegexCOICOP(Tool): name, description = "coicop_regex_rules", "Règles regex → candidats COICOP." inputs = {"text": {"type": "string", "description": "Libellé produit (texte libre) à analyser."}} output_type = "object" import re as _re SOFT = _re.compile(r"(?:\b|^)(?:CAMEMB(?:ERT)?|BRIE|COULOMMI(?:ERS?)?|BLEU|ROQUEFORT|GORGONZOLA|REBLOCHON|MUNSTER)(?:\b|$)") PRESS = _re.compile(r"(?:\b|^)(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)(?:\b|$)") GOAT = _re.compile(r"(?:\b|^)(CHEVRE|STE MAURE|CROTTIN|BUCHE|PICODON|PELARDON|BANON)(?:\b|$)") PROC = _re.compile(r"(?:\b|^)(FONDU(?:ES?)?|FROMAGE FONDU|TOASTINETTES?|VACHE QUI RIT|KIRI|CARRE FRAIS|CARR[ÉE] FRAIS|PORTIONS?)(?:\b|$)|\bRAP[ÉE]?\b") @staticmethod def _normalize_txt(s: str) -> str: import unicodedata, re if not s: return "" s = s.upper() s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") s = re.sub(r"[^A-Z0-9% ]+", " ", s) return re.sub(r"\s+", " ", s).strip() def forward(self, text: str): import re s = self._normalize_txt(text); c=[] if self.SOFT.search(s): c.append({"code":"01.1.4.5.2","why":"pâte molle/persillée","score":0.95}) if self.PRESS.search(s): c.append({"code":"01.1.4.5.3","why":"pâte pressée","score":0.90}) if self.GOAT.search(s): c.append({"code":"01.1.4.5.4","why":"chèvre","score":0.90}) if self.PROC.search(s): c.append({"code":"01.1.4.5.5","why":"fondu/râpé/portions","score":0.85}) if not c and re.search(r"\bFROMAGE\b", s): c.append({"code":"01.1.4.5","why":"générique fromage/laits caillés","score":0.6}) if not c and re.search(r"\bCR[ÉE]MEUX\b", s): c.append({"code":"01.1.4.5.1","why":"mot-clé 'crémeux' (laits caillés/crèmes fromagères)","score":0.55}) return {"candidates": c} # ---- OFFtoCOICOP ---- class OFFtoCOICOP(Tool): name, description = "map_off_to_coicop", "Mappe catégories OFF vers COICOP (off_payload ou champs séparés)." inputs = { "product_name": {"type":"string", "description":"Nom produit OFF (fr/en).", "nullable": True}, "categories_tags": {"type":"array", "description":"Liste OFF categories_tags.", "nullable": True}, "ingredients_text":{"type":"string","description":"Texte ingrédients.", "nullable": True}, "ingredients_list":{"type":"array", "description":"Liste structurée des ingrédients (strings).", "nullable": True}, "traces": {"type":"string","description":"Champ traces (fr).", "nullable": True}, "traces_tags": {"type":"array", "description":"Tags de traces.", "nullable": True}, # 🔧 IMPORTANT: on autorise un objet ici (dict ou string) "off_payload": {"type":"object","description":"Sortie brute de l'étape 2 (dict OU string).", "nullable": True}, } output_type="object" import re as _re, json as _json, ast as _ast def _normalize_txt(self, s: str) -> str: import unicodedata, re if not s: return "" s = s.upper() s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") s = re.sub(r"[^A-Z0-9% ]+", " ", s) return re.sub(r"\s+", " ", s).strip() def _to_list(self, x): import re if x is None: return [] if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()] if isinstance(x, str): return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()] return [str(x).strip()] def _safe_parse(self, x): # Accepte déjà un dict ; sinon essaie JSON puis literal_eval if isinstance(x, dict): return x if not isinstance(x, str): return {} try: return self._json.loads(x) except Exception: try: return self._ast.literal_eval(x) except Exception: return {} # --- mots-clés par familles SOFT = _re.compile(r"\b(CAMEMBERT|BRIE|COULOMMIERS|BLUE CHEESE|ROQUEFORT|GORGONZOLA|MUNSTER|REBLOCHON)\b") PRESS = _re.compile(r"\b(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)\b") GOAT = _re.compile(r"\b(CHEVRE|CH[ÈE]VRE|STE MAURE|CROTTIN|BUCHE|BUCHETTE|PICODON|PELARDON|BANON)\b") PROC = _re.compile(r"\b(FONDU|FONDUES?|RAPE|RÂPE|PORTIONS?|KIRI|VACHE QUI RIT|CARRE FRAIS|CARR[ÉE] FRAIS|TOASTINETTES?)\b") GENERIC_FROMAGE = _re.compile(r"\bFROMAGE[S]?\b") CREMEUX = _re.compile(r"\bCR[ÉE]MEUX\b") EN_CHEESE = _re.compile(r"\bCHEESE(S)?\b") # --- suppression des clauses "traces" _TRACES_BLOCK = _re.compile( r"(PEUT\s+CONTENIR\s+DES\s+TRACES\s+DE\s+[^.;\)\]]+)|" r"(MAY\s+CONTAIN\s+TRACES\s+OF\s+[^.;\)\]]+)|" r"(\bTRACES?\s+DE\s+[^.;\)\]]+)", _re.I ) def _without_traces(self, s: str) -> str: if not s: return "" return self._TRACES_BLOCK.sub(" ", s) def _mk(self, code, base, why, source): boost = {"name":0.05, "cat":0.04, "ing_no_traces":0.03, "ing":0.01}.get(source, 0.0) return {"code": code, "score": round(base+boost, 4), "why": f"{why} (source:{source})"} def _pad_min3(self, ranked, hint_is_cheese=False): # Padding déterministe pour garantir >=3 candidats sans dupliquer fallback_order = ["01.1.4.5.2","01.1.4.5.3","01.1.4.5.5","01.1.4.5.1","01.1.4.5"] present = {r["code"] for r in ranked} for code in fallback_order: if len(ranked) >= 3: break if code in present: continue why = "fallback générique fromage" if hint_is_cheese else "fallback faible (peu d'indices)" base = 0.52 if hint_is_cheese else 0.48 ranked.append({"code": code, "score": base, "why": why}) present.add(code) return ranked[:3] def forward(self, product_name=None, categories_tags=None, ingredients_text=None, ingredients_list=None, traces=None, traces_tags=None, off_payload=None): # Hydrate depuis off_payload si besoin (dict OU string), y compris step3_inputs if off_payload and not (product_name or categories_tags or ingredients_text or ingredients_list or traces or traces_tags): data = self._safe_parse(off_payload) or {} step3 = data.get("step3_inputs") or {} product_name = data.get("product_name") or step3.get("product_name") or "" categories_tags = self._to_list(data.get("categories_tags") or step3.get("categories_tags")) ingredients_text= data.get("ingredients_text") or step3.get("ingredients_text") or "" ingredients_list= self._to_list(data.get("ingredients_list")) traces = data.get("traces") or step3.get("traces") or "" traces_tags = self._to_list(data.get("traces_tags") or step3.get("traces_tags")) # Normalisations name = self._normalize_txt(product_name or "") cats_raw = " ".join(self._to_list(categories_tags)) cats = self._normalize_txt(cats_raw) ingt = self._normalize_txt(ingredients_text or "") ingt_no_tr = self._normalize_txt(self._without_traces(ingredients_text or "")) ing_list = [self._normalize_txt(x) for x in self._to_list(ingredients_list)] ing_join = " ".join(ing_list) ing_join_no_tr = self._normalize_txt(self._without_traces(ing_join)) # Indice large "fromage" hint_is_cheese = ( bool(self.GENERIC_FROMAGE.search(name) or self.GENERIC_FROMAGE.search(cats) or self.EN_CHEESE.search(cats)) or ("EN:CHEESES" in cats or "FR:FROMAGES" in cats or "FROMAGES" in cats) ) c=[] # 1) Nom produit & catégories (fort) if self.SOFT.search(name) or self.SOFT.search(cats): c.append(self._mk("01.1.4.5.2", 0.90, "OFF: pâte molle/persillée", "name" if self.SOFT.search(name) else "cat")) if self.PRESS.search(name) or self.PRESS.search(cats): c.append(self._mk("01.1.4.5.3", 0.87, "OFF: pâte pressée", "name" if self.PRESS.search(name) else "cat")) if self.GOAT.search(name) or self.GOAT.search(cats): c.append(self._mk("01.1.4.5.4", 0.88, "OFF: chèvre", "name" if self.GOAT.search(name) else "cat")) if self.PROC.search(name) or self.PROC.search(cats): c.append(self._mk("01.1.4.5.5", 0.86, "OFF: fondu/râpé/portions", "name" if self.PROC.search(name) else "cat")) # 2) Ingrédients – SANS "traces" (moyen) if self.SOFT.search(ingt_no_tr) or self.SOFT.search(ing_join_no_tr): c.append(self._mk("01.1.4.5.2", 0.84, "Ingrédients (sans traces): pâte molle/persillée", "ing_no_traces")) if self.PRESS.search(ingt_no_tr) or self.PRESS.search(ing_join_no_tr): c.append(self._mk("01.1.4.5.3", 0.82, "Ingrédients (sans traces): pâte pressée", "ing_no_traces")) if self.GOAT.search(ingt_no_tr) or self.GOAT.search(ing_join_no_tr): c.append(self._mk("01.1.4.5.4", 0.83, "Ingrédients (sans traces): chèvre", "ing_no_traces")) if self.PROC.search(ingt_no_tr) or self.PROC.search(ing_join_no_tr): c.append(self._mk("01.1.4.5.5", 0.80, "Ingrédients (sans traces): fondu/râpé/portions", "ing_no_traces")) # 3) Ingrédients bruts (faible — pas de déclencheur chèvre ici) if self.SOFT.search(ingt) or self.SOFT.search(ing_join): c.append(self._mk("01.1.4.5.2", 0.78, "Ingrédients: pâte molle/persillée", "ing")) if self.PRESS.search(ingt) or self.PRESS.search(ing_join): c.append(self._mk("01.1.4.5.3", 0.76, "Ingrédients: pâte pressée", "ing")) if self.PROC.search(ingt) or self.PROC.search(ing_join): c.append(self._mk("01.1.4.5.5", 0.74, "Ingrédients: fondu/râpé/portions", "ing")) # 4) Génériques si rien d'évident if not c and (hint_is_cheese or self.GENERIC_FROMAGE.search(name) or self.GENERIC_FROMAGE.search(cats) or self.CREMEUX.search(name)): # proposer générique fromage + 2 familles probables c.extend([ {"code":"01.1.4.5", "score":0.62, "why":"OFF: générique fromage"}, {"code":"01.1.4.5.2","score":0.60, "why":"fallback fromage (molle/persillée)"}, {"code":"01.1.4.5.3","score":0.59, "why":"fallback fromage (pressée)"}, ]) # Dédupliquer / agréger bucket={} for ci in c: code=ci["code"] if code not in bucket: bucket[code] = {**ci, "why_list":[ci.get("why","")]} else: if ci["score"]>bucket[code]["score"]: bucket[code].update({"score":ci["score"], "why":ci.get("why","")}) bucket[code]["why_list"].append(ci.get("why","")) ranked = sorted(bucket.values(), key=lambda x: x["score"], reverse=True) # 🎯 Toujours AU MOINS 3 candidats (avec padding si nécessaire) if len(ranked) < 3: ranked = self._pad_min3(ranked, hint_is_cheese=hint_is_cheese) return {"candidates": ranked[:3]} # ---- SemSim ---- class SemSim(Tool): name, description = "coicop_semantic_similarity", "Embeddings → top-k COICOP." inputs = {"text":{"type":"string","description":"Texte libellé"}, "topk":{"type":"integer","description":"Nombre de candidats (défaut 5)","nullable":True}} output_type = "object" requirements = ["sentence_transformers", "torch"] COICOP_ITEMS = COICOP_ITEMS @staticmethod def _normalize_txt(s: str) -> str: import unicodedata, re if not s: return "" s = s.upper() s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") s = re.sub(r"[^A-Z0-9% ]+", " ", s) return re.sub(r"\s+", " ", s).strip() def forward(self, text: str, topk: int = 5): if not hasattr(self, "_model"): self._model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") q = self._normalize_txt(text) q_emb = self._model.encode([q], normalize_embeddings=True) labels = [f"{it['code']} {it['label']}" for it in self.COICOP_ITEMS] L = self._model.encode(labels, normalize_embeddings=True) sims = util.cos_sim(q_emb, L).tolist()[0] ranked = sorted( [{"code": self.COICOP_ITEMS[i]["code"], "label": self.COICOP_ITEMS[i]["label"], "score": float(sims[i])} for i in range(len(self.COICOP_ITEMS))], key=lambda x: x["score"], reverse=True ) return {"candidates": ranked[:max(1,int(topk))]} # ---- Web tools (recherche & lecture) ---- class WebSearch(Tool): name = "web_search" description = "Recherche web légère (DuckDuckGo HTML). Entrée: query (fr/en). Retour: top résultats avec titre, url, snippet." inputs = {"query": {"type":"string","description":"Requête de recherche web."}} output_type = "object" requirements = ["requests"] def forward(self, query: str): import html sess = requests.Session() sess.headers.update({"User-Agent":"insee-coicop-agent/1.0"}) try: r = sess.get("https://duckduckgo.com/html/", params={"q": query, "kl":"fr-fr"}, timeout=15) r.raise_for_status() except Exception as e: return {"ok": False, "error": str(e), "results": []} # parsing très simple, sans dépendance lourde text = r.text # Résultats sous Titre results = [] for m in re.finditer(r']+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?)', text, re.I|re.S): url = html.unescape(m.group(1)) title = re.sub("<.*?>", "", html.unescape(m.group(2))).strip() # Snippet snip_m = re.search(r']+class="result__a"[^>]+href="{}"[^>]*>.*?.*?]+class="result__snippet"[^>]*>(.*?)'.format(re.escape(m.group(1))), text, re.I|re.S) snippet = "" if snip_m: snippet = re.sub("<.*?>", "", html.unescape(snip_m.group(1))).strip() if title and url: results.append({"title": title, "url": url, "snippet": snippet}) if len(results) >= 8: break return {"ok": True, "query": query, "results": results} class WebGet(Tool): name = "web_get" description = "Télécharge une page web et renvoie un texte brut nettoyé (limité à ~50k chars)." inputs = {"url": {"type":"string","description":"URL http(s) à lire."}} output_type = "object" requirements = ["requests", "beautifulsoup4"] def forward(self, url: str): import html text_out = "" try: r = requests.get(url, headers={"User-Agent":"insee-coicop-agent/1.0"}, timeout=20) if not r.ok: return {"ok": False, "status": r.status_code, "url": url, "text": ""} content = r.text try: from bs4 import BeautifulSoup soup = BeautifulSoup(content, "html.parser") # retirer scripts/styles/nav for tag in soup(["script","style","noscript","header","footer","nav","form","aside"]): tag.decompose() text_out = soup.get_text(separator=" ") except Exception: # fallback brut: retire les tags text_out = re.sub(r"|", " ", content, flags=re.S|re.I) text_out = re.sub(r"<[^>]+>", " ", text_out) text_out = re.sub(r"\s+", " ", text_out).strip() if len(text_out) > 50000: text_out = text_out[:50000] return {"ok": True, "url": url, "text": text_out} except Exception as e: return {"ok": False, "url": url, "error": str(e), "text": ""} # ---- MergeCandidatesTool ---- class MergeCandidatesTool(Tool): name = "merge_candidates" description = ("Fusionne des listes de candidats COICOP (dédupe par code, prend le score max, " "agrège les justifs) et garantit min_k éléments avec padding neutre.") inputs = { "candidates_lists": {"type": "array", "description": "Liste de dicts {'candidates':[...]} venant d'autres outils."}, "min_k": {"type": "integer", "description": "Taille minimale de la liste fusionnée (défaut 3).", "nullable": True}, "fallback_bias": {"type": "string", "description": "Indice métier pour le padding (ex: 'cheese' ou '').", "nullable": True}, "score_cap": {"type": "number", "description": "Clip des scores à [0, score_cap] (défaut 1.0).", "nullable": True}, } output_type = "object" def forward(self, candidates_lists, min_k: int = 3, fallback_bias: str = "", score_cap: float = 1.0): # 1) Collecte if not isinstance(candidates_lists, list): return {"candidates": []} bucket = {} # code -> {code, score, votes, why_list} for obj in candidates_lists: if not isinstance(obj, dict): continue for c in obj.get("candidates", []): code = c.get("code") if not code: continue score = float(c.get("score", c.get("score_final", 0.0))) if score_cap is not None: score = max(0.0, min(float(score_cap), score)) why = c.get("why", "") or c.get("label", "") if code not in bucket: bucket[code] = {"code": code, "score": score, "votes": 1, "why_list": [why] if why else []} else: # Garde le meilleur score, incrémente les votes, agrège les raisons if score > bucket[code]["score"]: bucket[code]["score"] = score bucket[code]["votes"] += 1 if why: bucket[code]["why_list"].append(why) merged = list(bucket.values()) # 2) Tri primaire par score puis par votes merged.sort(key=lambda x: (x["score"], x["votes"]), reverse=True) # 3) Padding si < min_k def _fallback_order(bias: str): # Ordre neutre mais raisonnable pour les fromages base = ["01.1.4.5.2", "01.1.4.5.3", "01.1.4.5.5", "01.1.4.5.1", "01.1.4.5"] return base if (bias or "").lower() == "cheese" else base if len(merged) < max(1, int(min_k or 3)): present = {m["code"] for m in merged} for code in _fallback_order(fallback_bias): if len(merged) >= min_k: break if code in present: continue merged.append({ "code": code, "score": 0.5 if (fallback_bias or "").lower() == "cheese" else 0.48, "votes": 0, "why_list": ["padding fallback"] }) present.add(code) # 4) Normalisation finale de forme (why synthétique) out = [] for m in merged[:max(1, int(min_k or 3))]: why = ", ".join(sorted(set([w for w in m.get("why_list", []) if w]))) if not why: why = "fusion (pas d'explications)" out.append({"code": m["code"], "score": m["score"], "votes": m["votes"], "why": why}) return {"candidates": out} # ---- Resolve ---- class Resolve(Tool): name, description = "resolve_coicop_candidates", "Fusionne candidats → choix final + alternatives + explication." inputs = {"json_lists": {"type":"array","description":"Liste de JSON (str/dict) d'autres tools."}, "topn":{"type":"integer","description":"Nb d'alternatives (défaut 3)","nullable":True}} output_type = "object" def _fallback_min3(self): # ordre neutre et scores modestes (avec libellés) base = [ {"code":"01.1.4.5.2","label": CODE_TO_LABEL.get("01.1.4.5.2",""), "score_final":0.50,"votes":0,"evidences":["fallback (aucune évidence)"]}, {"code":"01.1.4.5.3","label": CODE_TO_LABEL.get("01.1.4.5.3",""), "score_final":0.49,"votes":0,"evidences":["fallback (aucune évidence)"]}, {"code":"01.1.4.5.5","label": CODE_TO_LABEL.get("01.1.4.5.5",""), "score_final":0.48,"votes":0,"evidences":["fallback (aucune évidence)"]}, ] return base def forward(self, json_lists, topn: int = 3): import json from typing import Dict, Any bucket: Dict[str, Dict[str, Any]] = {} # Tolérance liste directe if isinstance(json_lists, list) and json_lists and isinstance(json_lists[0], dict) and "code" in json_lists[0]: json_lists = [{"candidates": json_lists}] for s in json_lists: data = s if isinstance(s, str): try: data = json.loads(s) except Exception: data = {} if not isinstance(data, dict): continue for c in data.get("candidates", []): code = c.get("code") if not code: continue score = float(c.get("score", c.get("score_final", 0.0))) why = c.get("why", "") or c.get("label", "") # ✅ libellé via le mapping (fallback sur un éventuel label déjà présent) label = CODE_TO_LABEL.get(code, c.get("label", "")) if code not in bucket: bucket[code] = { "code": code, "label": label, # <-- ajouté "score": score, "votes": 1, "evidences": [why] if why else [] } else: bucket[code]["score"] = max(bucket[code]["score"], score) bucket[code]["votes"] += 1 if why: bucket[code]["evidences"].append(why) # garde un label si absent if not bucket[code].get("label"): bucket[code]["label"] = label if not bucket: # 🔁 Fallback global si VRAIMENT rien n'a pu être agrégé (avec labels) ranked = self._fallback_min3() final = ranked[0] alts = ranked[1:] exp = "Aucun candidat issu des outils; retour d’un fallback générique (aucune évidence trouvée)." return {"final": final, "alternatives": alts, "candidates_top": ranked, "explanation": exp} for v in bucket.values(): v["score_final"] = v["score"] + 0.05*(v["votes"]-1) ranked = sorted(bucket.values(), key=lambda x: x["score_final"], reverse=True) # Top fusionné : au moins 3 min_top = max(3, topn if isinstance(topn, int) and topn>0 else 3) if len(ranked) < min_top: # compléter avec un petit fallback sans dupliquer (avec labels) already = {r["code"] for r in ranked} for fb in self._fallback_min3(): if len(ranked) >= min_top: break if fb["code"] in already: continue ranked.append(fb) # Sélection finale final = ranked[0] alts = ranked[1:1+min_top-1] # Sécurise le label si jamais manquant (ne change rien au scoring) final.setdefault("label", CODE_TO_LABEL.get(final["code"], "")) for a in alts: a.setdefault("label", CODE_TO_LABEL.get(a["code"], "")) ev = final.get("evidences", []) exp = ( f"Choix {final['code']} (score {final['score_final']:.2f}) – votes={final.get('votes',0)} – raisons: {', '.join(sorted(set(ev)))}" if ev else f"Choix {final['code']} (score {final['score_final']:.2f}) – fallback partiel." ) # candidates_top avec labels assurés candidates_top = [] for r in ranked[:min_top]: r.setdefault("label", CODE_TO_LABEL.get(r["code"], "")) candidates_top.append(r) return {"final": final, "alternatives": alts, "candidates_top": candidates_top, "explanation": exp} # ---- build_agent ---- def build_agent(model_id: str | None = None) -> CodeAgent: mid = model_id or FALLBACK_MODELS[0] model = InferenceClientModel( model_id=mid, temperature=0.2, max_tokens=HF_MAX_TOKENS, timeout=HF_TIMEOUT, # ⬅️ timeout augmenté top_p=0.95, ) agent = CodeAgent( tools=[ValidateEANTool(), OFFByEAN(), RegexCOICOP(), OFFtoCOICOP(), SemSim(), WebSearch(), WebGet(), MergeCandidatesTool(), Resolve()], model=model, add_base_tools=False, max_steps=AGENT_MAX_STEPS, # ⬅️ moins d’étapes = moins de tokens/latence verbosity_level=1, # ⬅️ logs plus courts = moins de tokens sortants ) return agent # ---- run task with fallback ---- def run_task_with_fallback(task: str): errors = [] for mid in [m for m in FALLBACK_MODELS if m]: try: agent = build_agent(mid) return agent.run(task) except Exception as e: errors.append(f"{mid}: {type(e).__name__}: {e}") # on tente le modèle suivant continue # Si TOUT a échoué, renvoyer un JSON propre plutôt qu’un crash return { "final": None, "alternatives": [], "candidates_top": [], "explanation": "LLM backend indisponible (timeouts).", "errors": errors, } def parse_result(res): if isinstance(res, dict): return res try: return ast.literal_eval(res) except Exception: return {"raw": res} if __name__ == "__main__": ean = "3256221112345" # EAN fictif label = "Les p'tits crémeux – Aldi – 216 g" agent = build_agent() task = f"""\ Classe ce produit en COICOP: EAN: {ean} Libellé: {label} Outils autorisés : - validate_ean - openfoodfacts_product_by_ean - map_off_to_coicop - coicop_regex_rules - coicop_semantic_similarity - merge_candidates - resolve_coicop_candidates - python_interpreter # UNIQUEMENT pour lignes simples d’assignation ou d’appel d’outil Règles STRICTES d’écriture de code : - Aucune structure de contrôle Python : pas de if, else, for, while, try, with, def, class. - Aucun print, aucun logging, aucune concaténation multi-ligne. - Chaque bloc de code contient une seule instruction Python, sur une seule ligne. - Commencer par définir deux variables : 1) EAN_STR = "{ean}" 2) LBL = \"\"\"{label}\"\"\" - Pour tous les outils qui prennent le libellé, utiliser LBL. - La fonction validate_ean renvoie un dictionnaire avec les clés 'valid' et 'normalized'. Ne pas la traiter comme un booléen directement. Règles STRICTES de sortie : - Terminer par un unique objet JSON valide en appelant final_answer avec cet objet. - Ne pas ajouter de texte en dehors de l’objet JSON final. - Ne pas utiliser de backticks. - Le JSON final doit contenir les clés : final, alternatives, candidates_top, explanation. Branchements (décision prise sans écrire de if en code) : - MODE AVEC EAN si EAN_STR n’est pas "N/A" ET si validate_ean(EAN_STR) renvoie valid = True ET si l’appel OpenFoodFacts renvoie ok = True. - Sinon, MODE SANS EAN. Pipeline — MODE AVEC EAN : 1) v = validate_ean(EAN_STR) 2) off = openfoodfacts_product_by_ean(EAN_STR) 3) offmap = map_off_to_coicop(off_payload=off) 4) rx = coicop_regex_rules(text=LBL) 5) sem = coicop_semantic_similarity(text=LBL, topk=5) 6) merged = merge_candidates(candidates_lists=[offmap, rx, sem], min_k=3, fallback_bias="cheese") 7) res = resolve_coicop_candidates(json_lists=[merged], topn=3) → Appeler immédiatement final_answer avec res. Pipeline — MODE SANS EAN : 1) rx = coicop_regex_rules(text=LBL) 2) sem = coicop_semantic_similarity(text=LBL, topk=5) 3) merged = merge_candidates(candidates_lists=[rx, sem], min_k=3, fallback_bias="cheese") 4) res = resolve_coicop_candidates(json_lists=[merged], topn=3) → Appeler immédiatement final_answer avec res. Contraintes d’usage : - Utiliser python_interpreter uniquement pour des lignes uniques d’assignation ou d’appel d’outil (ex: var = tool(args) ou tool(args)). - Ne créer aucun fichier et ne faire aucune entrée/sortie externe. """ # out = agent.run(task) out = run_task_with_fallback(task) print(parse_result(out))