from __future__ import annotations
import json, re, unicodedata, ast, os
from typing import List, Dict, Any, Optional
import requests
from smolagents import Tool, CodeAgent, InferenceClientModel
from sentence_transformers import SentenceTransformer, util
# --- Config runtime via env (avec valeurs par défaut sûres sur Space) ---
HF_TIMEOUT = int(os.getenv("HF_TIMEOUT", "180")) # 180s au lieu de 60s
HF_MAX_TOKENS = int(os.getenv("HF_MAX_TOKENS", "384")) # réduire un peu la génération
AGENT_MAX_STEPS = int(os.getenv("AGENT_MAX_STEPS", "6"))
# Ordre: un modèle préféré, puis 2 replis rapides et dispo publique
FALLBACK_MODELS = [
os.getenv("HF_MODEL_ID") or "meta-llama/Meta-Llama-3.1-8B-Instruct",
"Qwen/Qwen2.5-7B-Instruct",
"HuggingFaceH4/zephyr-7b-beta",
]
# ---- Mini référentiel COICOP (démo) ----
# ---- Mini référentiel COICOP (démo) ----
COICOP_ITEMS = [
{"code": "01.1.4.5.1", "label": "Laits caillés, fromage blanc, petites crèmes fromagères"},
{"code": "01.1.4.5.2", "label": "Fromage à pâte molle et à pâte persillée"},
{"code": "01.1.4.5.3", "label": "Fromage à pâte pressée"},
{"code": "01.1.4.5.4", "label": "Fromage de chèvre"},
{"code": "01.1.4.5.5", "label": "Fromages fondus, râpés, portions"},
{"code": "01.1.1.4", "label": "Pain"},
{"code": "01.1.1.1", "label": "Riz"},
{"code": "01.1.1.3", "label": "Pâtes, couscous et produits similaires"},
]
# ✅ Map code -> libellé (avec un libellé pour le code générique)
CODE_TO_LABEL = {it["code"]: it["label"] for it in COICOP_ITEMS}
CODE_TO_LABEL.setdefault("01.1.4.5", "Fromages (générique)")
def normalize_txt(s: str) -> str:
if not s: return ""
s = s.upper()
s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
s = re.sub(r"[^A-Z0-9% ]+", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def ean_check_digit_ok(ean: str) -> bool:
digits = re.sub(r"\D", "", ean)
if len(digits) not in (8, 12, 13, 14): return False
total = 0
for i, ch in enumerate(reversed(digits[:-1]), start=1):
n = int(ch); total += n * (3 if i % 2 == 1 else 1)
check = (10 - (total % 10)) % 10
return check == int(digits[-1])
# ---- ValidateEANTool ----
class ValidateEANTool(Tool):
name, description = "validate_ean", "Valide un EAN/GTIN (clé GS1)."
inputs = {"ean": {"type": "string", "description": "Code EAN/GTIN (8/12/13/14 chiffres)."}}
output_type = "object"
def forward(self, ean: str):
digits = re.sub(r"\D", "", ean or "")
if len(digits) not in (8, 12, 13, 14):
return {"valid": False, "normalized": digits}
total = 0
for i, ch in enumerate(reversed(digits[:-1]), start=1):
n = int(ch); total += n * (3 if i % 2 == 1 else 1)
check = (10 - (total % 10)) % 10
return {"valid": check == int(digits[-1]), "normalized": digits}
# ---- OFFByEAN ----
class OFFByEAN(Tool):
name = "openfoodfacts_product_by_ean"
description = "Open Food Facts /api/v0|v2/product/{ean} (name, brands, categories...)."
inputs = {"ean": {"type": "string", "description": "EAN à interroger sur l'API OFF."}}
output_type = "object"
requirements = ["requests"]
def forward(self, ean: str):
import re, json
from requests.adapters import HTTPAdapter
try:
from urllib3.util.retry import Retry
except Exception:
Retry = None
def _to_list(x):
if x is None: return []
if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()]
if isinstance(x, str):
return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()]
return [str(x).strip()]
def _first(*vals):
for v in vals:
if isinstance(v, str) and v.strip(): return v.strip()
return ""
code = re.sub(r"\D", "", ean or "")
if not code:
return {"ok": False, "status": 0, "code": "", "error": "EAN vide"}
sess = requests.Session()
sess.headers.update({"User-Agent":"insee-coicop-agent/1.0","Accept":"application/json"})
if Retry:
retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[429,500,502,503,504],
allowed_methods=frozenset(["GET"]), raise_on_status=False)
sess.mount("https://", HTTPAdapter(max_retries=retry))
urls = [
f"https://world.openfoodfacts.org/api/v0/product/{code}.json",
"https://world.openfoodfacts.org/api/v2/product/"
f"{code}?lc=fr&fields=code,product_name,product_name_fr,brands,labels_tags,"
"categories_tags,categories_tags_fr,categories_hierarchy,ingredients,ingredients_text,"
"ingredients_text_fr,ingredients_text_en,allergens,allergens_tags,traces,traces_tags,"
"stores,status,status_verbose",
f"https://world.openfoodfacts.net/api/v0/product/{code}.json",
]
last_err = None
for u in urls:
try:
r = sess.get(u, timeout=15)
if not r.ok:
last_err = f"HTTP {r.status_code}"
continue
data = r.json()
product = data.get("product")
status = data.get("status", 1 if product else 0)
if status == 1 or product:
p = product or {}
product_name = _first(p.get("product_name_fr"), p.get("product_name"))
categories_tags = p.get("categories_tags_fr") or p.get("categories_tags") or p.get("categories")
categories_tags = _to_list(categories_tags)
categories_hierarchy = _to_list(p.get("categories_hierarchy"))
# Ingrédients : texte + liste structurée
ingredients_text = _first(p.get("ingredients_text_fr"), p.get("ingredients_text_en"), p.get("ingredients_text"))
ingredients_list = []
if isinstance(p.get("ingredients"), list):
for it in p["ingredients"]:
txt = it.get("text") or it.get("id") or ""
if txt: ingredients_list.append(str(txt).strip())
allergens = _first(p.get("allergens"), None)
allergens_tags = _to_list(p.get("allergens_tags"))
traces = _first(p.get("traces"), None) # ex: "lait, noisettes"
traces_tags = _to_list(p.get("traces_tags"))
labels_tags = _to_list(p.get("labels_tags"))
brands = _first(p.get("brands"), None)
stores = _first(p.get("stores"), None)
return {
"ok": True, "status": status, "status_verbose": data.get("status_verbose"),
"code": code, "used_url": u,
"product_name": product_name,
"categories_tags": categories_tags,
"categories_hierarchy": categories_hierarchy,
"ingredients_text": ingredients_text,
"ingredients_list": ingredients_list,
"allergens": allergens,
"allergens_tags": allergens_tags,
"traces": traces,
"traces_tags": traces_tags,
"labels_tags": labels_tags,
"brands": brands, "brands_list": _to_list(brands),
"stores": stores, "stores_list": _to_list(stores),
# Entrées déjà prêtes pour l’étape 3
"step3_inputs": {
"product_name": product_name,
"categories_tags": categories_tags,
"ingredients_text": ingredients_text,
"ingredients_list": ingredients_list,
"traces": traces,
"traces_tags": traces_tags,
},
}
except Exception as e:
last_err = str(e)
return {"ok": False, "status": 0, "code": code, "error": last_err or "not found"}
# ---- RegexCOICOP ----
class RegexCOICOP(Tool):
name, description = "coicop_regex_rules", "Règles regex → candidats COICOP."
inputs = {"text": {"type": "string", "description": "Libellé produit (texte libre) à analyser."}}
output_type = "object"
import re as _re
SOFT = _re.compile(r"(?:\b|^)(?:CAMEMB(?:ERT)?|BRIE|COULOMMI(?:ERS?)?|BLEU|ROQUEFORT|GORGONZOLA|REBLOCHON|MUNSTER)(?:\b|$)")
PRESS = _re.compile(r"(?:\b|^)(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)(?:\b|$)")
GOAT = _re.compile(r"(?:\b|^)(CHEVRE|STE MAURE|CROTTIN|BUCHE|PICODON|PELARDON|BANON)(?:\b|$)")
PROC = _re.compile(r"(?:\b|^)(FONDU(?:ES?)?|FROMAGE FONDU|TOASTINETTES?|VACHE QUI RIT|KIRI|CARRE FRAIS|CARR[ÉE] FRAIS|PORTIONS?)(?:\b|$)|\bRAP[ÉE]?\b")
@staticmethod
def _normalize_txt(s: str) -> str:
import unicodedata, re
if not s: return ""
s = s.upper()
s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
s = re.sub(r"[^A-Z0-9% ]+", " ", s)
return re.sub(r"\s+", " ", s).strip()
def forward(self, text: str):
import re
s = self._normalize_txt(text); c=[]
if self.SOFT.search(s): c.append({"code":"01.1.4.5.2","why":"pâte molle/persillée","score":0.95})
if self.PRESS.search(s): c.append({"code":"01.1.4.5.3","why":"pâte pressée","score":0.90})
if self.GOAT.search(s): c.append({"code":"01.1.4.5.4","why":"chèvre","score":0.90})
if self.PROC.search(s): c.append({"code":"01.1.4.5.5","why":"fondu/râpé/portions","score":0.85})
if not c and re.search(r"\bFROMAGE\b", s): c.append({"code":"01.1.4.5","why":"générique fromage/laits caillés","score":0.6})
if not c and re.search(r"\bCR[ÉE]MEUX\b", s): c.append({"code":"01.1.4.5.1","why":"mot-clé 'crémeux' (laits caillés/crèmes fromagères)","score":0.55})
return {"candidates": c}
# ---- OFFtoCOICOP ----
class OFFtoCOICOP(Tool):
name, description = "map_off_to_coicop", "Mappe catégories OFF vers COICOP (off_payload ou champs séparés)."
inputs = {
"product_name": {"type":"string", "description":"Nom produit OFF (fr/en).", "nullable": True},
"categories_tags": {"type":"array", "description":"Liste OFF categories_tags.", "nullable": True},
"ingredients_text":{"type":"string","description":"Texte ingrédients.", "nullable": True},
"ingredients_list":{"type":"array", "description":"Liste structurée des ingrédients (strings).", "nullable": True},
"traces": {"type":"string","description":"Champ traces (fr).", "nullable": True},
"traces_tags": {"type":"array", "description":"Tags de traces.", "nullable": True},
# 🔧 IMPORTANT: on autorise un objet ici (dict ou string)
"off_payload": {"type":"object","description":"Sortie brute de l'étape 2 (dict OU string).", "nullable": True},
}
output_type="object"
import re as _re, json as _json, ast as _ast
def _normalize_txt(self, s: str) -> str:
import unicodedata, re
if not s: return ""
s = s.upper()
s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
s = re.sub(r"[^A-Z0-9% ]+", " ", s)
return re.sub(r"\s+", " ", s).strip()
def _to_list(self, x):
import re
if x is None: return []
if isinstance(x, list): return [str(t).strip() for t in x if str(t).strip()]
if isinstance(x, str): return [p.strip() for p in re.split(r"[,\|;]", x) if p.strip()]
return [str(x).strip()]
def _safe_parse(self, x):
# Accepte déjà un dict ; sinon essaie JSON puis literal_eval
if isinstance(x, dict): return x
if not isinstance(x, str): return {}
try: return self._json.loads(x)
except Exception:
try: return self._ast.literal_eval(x)
except Exception: return {}
# --- mots-clés par familles
SOFT = _re.compile(r"\b(CAMEMBERT|BRIE|COULOMMIERS|BLUE CHEESE|ROQUEFORT|GORGONZOLA|MUNSTER|REBLOCHON)\b")
PRESS = _re.compile(r"\b(EMMENTAL|COMTE|CANTAL|MIMOLETTE|GOUDA|EDAM|BEAUFORT|ABONDANCE|SALERS|TOMME|TOME)\b")
GOAT = _re.compile(r"\b(CHEVRE|CH[ÈE]VRE|STE MAURE|CROTTIN|BUCHE|BUCHETTE|PICODON|PELARDON|BANON)\b")
PROC = _re.compile(r"\b(FONDU|FONDUES?|RAPE|RÂPE|PORTIONS?|KIRI|VACHE QUI RIT|CARRE FRAIS|CARR[ÉE] FRAIS|TOASTINETTES?)\b")
GENERIC_FROMAGE = _re.compile(r"\bFROMAGE[S]?\b")
CREMEUX = _re.compile(r"\bCR[ÉE]MEUX\b")
EN_CHEESE = _re.compile(r"\bCHEESE(S)?\b")
# --- suppression des clauses "traces"
_TRACES_BLOCK = _re.compile(
r"(PEUT\s+CONTENIR\s+DES\s+TRACES\s+DE\s+[^.;\)\]]+)|"
r"(MAY\s+CONTAIN\s+TRACES\s+OF\s+[^.;\)\]]+)|"
r"(\bTRACES?\s+DE\s+[^.;\)\]]+)",
_re.I
)
def _without_traces(self, s: str) -> str:
if not s: return ""
return self._TRACES_BLOCK.sub(" ", s)
def _mk(self, code, base, why, source):
boost = {"name":0.05, "cat":0.04, "ing_no_traces":0.03, "ing":0.01}.get(source, 0.0)
return {"code": code, "score": round(base+boost, 4), "why": f"{why} (source:{source})"}
def _pad_min3(self, ranked, hint_is_cheese=False):
# Padding déterministe pour garantir >=3 candidats sans dupliquer
fallback_order = ["01.1.4.5.2","01.1.4.5.3","01.1.4.5.5","01.1.4.5.1","01.1.4.5"]
present = {r["code"] for r in ranked}
for code in fallback_order:
if len(ranked) >= 3: break
if code in present: continue
why = "fallback générique fromage" if hint_is_cheese else "fallback faible (peu d'indices)"
base = 0.52 if hint_is_cheese else 0.48
ranked.append({"code": code, "score": base, "why": why})
present.add(code)
return ranked[:3]
def forward(self, product_name=None, categories_tags=None, ingredients_text=None,
ingredients_list=None, traces=None, traces_tags=None, off_payload=None):
# Hydrate depuis off_payload si besoin (dict OU string), y compris step3_inputs
if off_payload and not (product_name or categories_tags or ingredients_text or ingredients_list or traces or traces_tags):
data = self._safe_parse(off_payload) or {}
step3 = data.get("step3_inputs") or {}
product_name = data.get("product_name") or step3.get("product_name") or ""
categories_tags = self._to_list(data.get("categories_tags") or step3.get("categories_tags"))
ingredients_text= data.get("ingredients_text") or step3.get("ingredients_text") or ""
ingredients_list= self._to_list(data.get("ingredients_list"))
traces = data.get("traces") or step3.get("traces") or ""
traces_tags = self._to_list(data.get("traces_tags") or step3.get("traces_tags"))
# Normalisations
name = self._normalize_txt(product_name or "")
cats_raw = " ".join(self._to_list(categories_tags))
cats = self._normalize_txt(cats_raw)
ingt = self._normalize_txt(ingredients_text or "")
ingt_no_tr = self._normalize_txt(self._without_traces(ingredients_text or ""))
ing_list = [self._normalize_txt(x) for x in self._to_list(ingredients_list)]
ing_join = " ".join(ing_list)
ing_join_no_tr = self._normalize_txt(self._without_traces(ing_join))
# Indice large "fromage"
hint_is_cheese = (
bool(self.GENERIC_FROMAGE.search(name) or self.GENERIC_FROMAGE.search(cats) or self.EN_CHEESE.search(cats))
or ("EN:CHEESES" in cats or "FR:FROMAGES" in cats or "FROMAGES" in cats)
)
c=[]
# 1) Nom produit & catégories (fort)
if self.SOFT.search(name) or self.SOFT.search(cats):
c.append(self._mk("01.1.4.5.2", 0.90, "OFF: pâte molle/persillée", "name" if self.SOFT.search(name) else "cat"))
if self.PRESS.search(name) or self.PRESS.search(cats):
c.append(self._mk("01.1.4.5.3", 0.87, "OFF: pâte pressée", "name" if self.PRESS.search(name) else "cat"))
if self.GOAT.search(name) or self.GOAT.search(cats):
c.append(self._mk("01.1.4.5.4", 0.88, "OFF: chèvre", "name" if self.GOAT.search(name) else "cat"))
if self.PROC.search(name) or self.PROC.search(cats):
c.append(self._mk("01.1.4.5.5", 0.86, "OFF: fondu/râpé/portions", "name" if self.PROC.search(name) else "cat"))
# 2) Ingrédients – SANS "traces" (moyen)
if self.SOFT.search(ingt_no_tr) or self.SOFT.search(ing_join_no_tr):
c.append(self._mk("01.1.4.5.2", 0.84, "Ingrédients (sans traces): pâte molle/persillée", "ing_no_traces"))
if self.PRESS.search(ingt_no_tr) or self.PRESS.search(ing_join_no_tr):
c.append(self._mk("01.1.4.5.3", 0.82, "Ingrédients (sans traces): pâte pressée", "ing_no_traces"))
if self.GOAT.search(ingt_no_tr) or self.GOAT.search(ing_join_no_tr):
c.append(self._mk("01.1.4.5.4", 0.83, "Ingrédients (sans traces): chèvre", "ing_no_traces"))
if self.PROC.search(ingt_no_tr) or self.PROC.search(ing_join_no_tr):
c.append(self._mk("01.1.4.5.5", 0.80, "Ingrédients (sans traces): fondu/râpé/portions", "ing_no_traces"))
# 3) Ingrédients bruts (faible — pas de déclencheur chèvre ici)
if self.SOFT.search(ingt) or self.SOFT.search(ing_join):
c.append(self._mk("01.1.4.5.2", 0.78, "Ingrédients: pâte molle/persillée", "ing"))
if self.PRESS.search(ingt) or self.PRESS.search(ing_join):
c.append(self._mk("01.1.4.5.3", 0.76, "Ingrédients: pâte pressée", "ing"))
if self.PROC.search(ingt) or self.PROC.search(ing_join):
c.append(self._mk("01.1.4.5.5", 0.74, "Ingrédients: fondu/râpé/portions", "ing"))
# 4) Génériques si rien d'évident
if not c and (hint_is_cheese or self.GENERIC_FROMAGE.search(name) or self.GENERIC_FROMAGE.search(cats) or self.CREMEUX.search(name)):
# proposer générique fromage + 2 familles probables
c.extend([
{"code":"01.1.4.5", "score":0.62, "why":"OFF: générique fromage"},
{"code":"01.1.4.5.2","score":0.60, "why":"fallback fromage (molle/persillée)"},
{"code":"01.1.4.5.3","score":0.59, "why":"fallback fromage (pressée)"},
])
# Dédupliquer / agréger
bucket={}
for ci in c:
code=ci["code"]
if code not in bucket:
bucket[code] = {**ci, "why_list":[ci.get("why","")]}
else:
if ci["score"]>bucket[code]["score"]:
bucket[code].update({"score":ci["score"], "why":ci.get("why","")})
bucket[code]["why_list"].append(ci.get("why",""))
ranked = sorted(bucket.values(), key=lambda x: x["score"], reverse=True)
# 🎯 Toujours AU MOINS 3 candidats (avec padding si nécessaire)
if len(ranked) < 3:
ranked = self._pad_min3(ranked, hint_is_cheese=hint_is_cheese)
return {"candidates": ranked[:3]}
# ---- SemSim ----
class SemSim(Tool):
name, description = "coicop_semantic_similarity", "Embeddings → top-k COICOP."
inputs = {"text":{"type":"string","description":"Texte libellé"},
"topk":{"type":"integer","description":"Nombre de candidats (défaut 5)","nullable":True}}
output_type = "object"
requirements = ["sentence_transformers", "torch"]
COICOP_ITEMS = COICOP_ITEMS
@staticmethod
def _normalize_txt(s: str) -> str:
import unicodedata, re
if not s: return ""
s = s.upper()
s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
s = re.sub(r"[^A-Z0-9% ]+", " ", s)
return re.sub(r"\s+", " ", s).strip()
def forward(self, text: str, topk: int = 5):
if not hasattr(self, "_model"):
self._model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
q = self._normalize_txt(text)
q_emb = self._model.encode([q], normalize_embeddings=True)
labels = [f"{it['code']} {it['label']}" for it in self.COICOP_ITEMS]
L = self._model.encode(labels, normalize_embeddings=True)
sims = util.cos_sim(q_emb, L).tolist()[0]
ranked = sorted(
[{"code": self.COICOP_ITEMS[i]["code"], "label": self.COICOP_ITEMS[i]["label"], "score": float(sims[i])}
for i in range(len(self.COICOP_ITEMS))],
key=lambda x: x["score"], reverse=True
)
return {"candidates": ranked[:max(1,int(topk))]}
# ---- Web tools (recherche & lecture) ----
class WebSearch(Tool):
name = "web_search"
description = "Recherche web légère (DuckDuckGo HTML). Entrée: query (fr/en). Retour: top résultats avec titre, url, snippet."
inputs = {"query": {"type":"string","description":"Requête de recherche web."}}
output_type = "object"
requirements = ["requests"]
def forward(self, query: str):
import html
sess = requests.Session()
sess.headers.update({"User-Agent":"insee-coicop-agent/1.0"})
try:
r = sess.get("https://duckduckgo.com/html/", params={"q": query, "kl":"fr-fr"}, timeout=15)
r.raise_for_status()
except Exception as e:
return {"ok": False, "error": str(e), "results": []}
# parsing très simple, sans dépendance lourde
text = r.text
# Résultats sous Titre
results = []
for m in re.finditer(r']+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?)', text, re.I|re.S):
url = html.unescape(m.group(1))
title = re.sub("<.*?>", "", html.unescape(m.group(2))).strip()
# Snippet
snip_m = re.search(r']+class="result__a"[^>]+href="{}"[^>]*>.*?.*?]+class="result__snippet"[^>]*>(.*?)'.format(re.escape(m.group(1))), text, re.I|re.S)
snippet = ""
if snip_m:
snippet = re.sub("<.*?>", "", html.unescape(snip_m.group(1))).strip()
if title and url:
results.append({"title": title, "url": url, "snippet": snippet})
if len(results) >= 8:
break
return {"ok": True, "query": query, "results": results}
class WebGet(Tool):
name = "web_get"
description = "Télécharge une page web et renvoie un texte brut nettoyé (limité à ~50k chars)."
inputs = {"url": {"type":"string","description":"URL http(s) à lire."}}
output_type = "object"
requirements = ["requests", "beautifulsoup4"]
def forward(self, url: str):
import html
text_out = ""
try:
r = requests.get(url, headers={"User-Agent":"insee-coicop-agent/1.0"}, timeout=20)
if not r.ok:
return {"ok": False, "status": r.status_code, "url": url, "text": ""}
content = r.text
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(content, "html.parser")
# retirer scripts/styles/nav
for tag in soup(["script","style","noscript","header","footer","nav","form","aside"]):
tag.decompose()
text_out = soup.get_text(separator=" ")
except Exception:
# fallback brut: retire les tags
text_out = re.sub(r"|", " ", content, flags=re.S|re.I)
text_out = re.sub(r"<[^>]+>", " ", text_out)
text_out = re.sub(r"\s+", " ", text_out).strip()
if len(text_out) > 50000:
text_out = text_out[:50000]
return {"ok": True, "url": url, "text": text_out}
except Exception as e:
return {"ok": False, "url": url, "error": str(e), "text": ""}
# ---- MergeCandidatesTool ----
class MergeCandidatesTool(Tool):
name = "merge_candidates"
description = ("Fusionne des listes de candidats COICOP (dédupe par code, prend le score max, "
"agrège les justifs) et garantit min_k éléments avec padding neutre.")
inputs = {
"candidates_lists": {"type": "array", "description": "Liste de dicts {'candidates':[...]} venant d'autres outils."},
"min_k": {"type": "integer", "description": "Taille minimale de la liste fusionnée (défaut 3).", "nullable": True},
"fallback_bias": {"type": "string", "description": "Indice métier pour le padding (ex: 'cheese' ou '').", "nullable": True},
"score_cap": {"type": "number", "description": "Clip des scores à [0, score_cap] (défaut 1.0).", "nullable": True},
}
output_type = "object"
def forward(self, candidates_lists, min_k: int = 3, fallback_bias: str = "", score_cap: float = 1.0):
# 1) Collecte
if not isinstance(candidates_lists, list):
return {"candidates": []}
bucket = {} # code -> {code, score, votes, why_list}
for obj in candidates_lists:
if not isinstance(obj, dict):
continue
for c in obj.get("candidates", []):
code = c.get("code")
if not code:
continue
score = float(c.get("score", c.get("score_final", 0.0)))
if score_cap is not None:
score = max(0.0, min(float(score_cap), score))
why = c.get("why", "") or c.get("label", "")
if code not in bucket:
bucket[code] = {"code": code, "score": score, "votes": 1, "why_list": [why] if why else []}
else:
# Garde le meilleur score, incrémente les votes, agrège les raisons
if score > bucket[code]["score"]:
bucket[code]["score"] = score
bucket[code]["votes"] += 1
if why:
bucket[code]["why_list"].append(why)
merged = list(bucket.values())
# 2) Tri primaire par score puis par votes
merged.sort(key=lambda x: (x["score"], x["votes"]), reverse=True)
# 3) Padding si < min_k
def _fallback_order(bias: str):
# Ordre neutre mais raisonnable pour les fromages
base = ["01.1.4.5.2", "01.1.4.5.3", "01.1.4.5.5", "01.1.4.5.1", "01.1.4.5"]
return base if (bias or "").lower() == "cheese" else base
if len(merged) < max(1, int(min_k or 3)):
present = {m["code"] for m in merged}
for code in _fallback_order(fallback_bias):
if len(merged) >= min_k:
break
if code in present:
continue
merged.append({
"code": code,
"score": 0.5 if (fallback_bias or "").lower() == "cheese" else 0.48,
"votes": 0,
"why_list": ["padding fallback"]
})
present.add(code)
# 4) Normalisation finale de forme (why synthétique)
out = []
for m in merged[:max(1, int(min_k or 3))]:
why = ", ".join(sorted(set([w for w in m.get("why_list", []) if w])))
if not why:
why = "fusion (pas d'explications)"
out.append({"code": m["code"], "score": m["score"], "votes": m["votes"], "why": why})
return {"candidates": out}
# ---- Resolve ----
class Resolve(Tool):
name, description = "resolve_coicop_candidates", "Fusionne candidats → choix final + alternatives + explication."
inputs = {"json_lists": {"type":"array","description":"Liste de JSON (str/dict) d'autres tools."},
"topn":{"type":"integer","description":"Nb d'alternatives (défaut 3)","nullable":True}}
output_type = "object"
def _fallback_min3(self):
# ordre neutre et scores modestes (avec libellés)
base = [
{"code":"01.1.4.5.2","label": CODE_TO_LABEL.get("01.1.4.5.2",""),
"score_final":0.50,"votes":0,"evidences":["fallback (aucune évidence)"]},
{"code":"01.1.4.5.3","label": CODE_TO_LABEL.get("01.1.4.5.3",""),
"score_final":0.49,"votes":0,"evidences":["fallback (aucune évidence)"]},
{"code":"01.1.4.5.5","label": CODE_TO_LABEL.get("01.1.4.5.5",""),
"score_final":0.48,"votes":0,"evidences":["fallback (aucune évidence)"]},
]
return base
def forward(self, json_lists, topn: int = 3):
import json
from typing import Dict, Any
bucket: Dict[str, Dict[str, Any]] = {}
# Tolérance liste directe
if isinstance(json_lists, list) and json_lists and isinstance(json_lists[0], dict) and "code" in json_lists[0]:
json_lists = [{"candidates": json_lists}]
for s in json_lists:
data = s
if isinstance(s, str):
try: data = json.loads(s)
except Exception: data = {}
if not isinstance(data, dict):
continue
for c in data.get("candidates", []):
code = c.get("code")
if not code:
continue
score = float(c.get("score", c.get("score_final", 0.0)))
why = c.get("why", "") or c.get("label", "")
# ✅ libellé via le mapping (fallback sur un éventuel label déjà présent)
label = CODE_TO_LABEL.get(code, c.get("label", ""))
if code not in bucket:
bucket[code] = {
"code": code,
"label": label, # <-- ajouté
"score": score,
"votes": 1,
"evidences": [why] if why else []
}
else:
bucket[code]["score"] = max(bucket[code]["score"], score)
bucket[code]["votes"] += 1
if why:
bucket[code]["evidences"].append(why)
# garde un label si absent
if not bucket[code].get("label"):
bucket[code]["label"] = label
if not bucket:
# 🔁 Fallback global si VRAIMENT rien n'a pu être agrégé (avec labels)
ranked = self._fallback_min3()
final = ranked[0]
alts = ranked[1:]
exp = "Aucun candidat issu des outils; retour d’un fallback générique (aucune évidence trouvée)."
return {"final": final, "alternatives": alts, "candidates_top": ranked, "explanation": exp}
for v in bucket.values():
v["score_final"] = v["score"] + 0.05*(v["votes"]-1)
ranked = sorted(bucket.values(), key=lambda x: x["score_final"], reverse=True)
# Top fusionné : au moins 3
min_top = max(3, topn if isinstance(topn, int) and topn>0 else 3)
if len(ranked) < min_top:
# compléter avec un petit fallback sans dupliquer (avec labels)
already = {r["code"] for r in ranked}
for fb in self._fallback_min3():
if len(ranked) >= min_top:
break
if fb["code"] in already:
continue
ranked.append(fb)
# Sélection finale
final = ranked[0]
alts = ranked[1:1+min_top-1]
# Sécurise le label si jamais manquant (ne change rien au scoring)
final.setdefault("label", CODE_TO_LABEL.get(final["code"], ""))
for a in alts:
a.setdefault("label", CODE_TO_LABEL.get(a["code"], ""))
ev = final.get("evidences", [])
exp = (
f"Choix {final['code']} (score {final['score_final']:.2f}) – votes={final.get('votes',0)} – raisons: {', '.join(sorted(set(ev)))}"
if ev else
f"Choix {final['code']} (score {final['score_final']:.2f}) – fallback partiel."
)
# candidates_top avec labels assurés
candidates_top = []
for r in ranked[:min_top]:
r.setdefault("label", CODE_TO_LABEL.get(r["code"], ""))
candidates_top.append(r)
return {"final": final, "alternatives": alts, "candidates_top": candidates_top, "explanation": exp}
# ---- build_agent ----
def build_agent(model_id: str | None = None) -> CodeAgent:
mid = model_id or FALLBACK_MODELS[0]
model = InferenceClientModel(
model_id=mid,
temperature=0.2,
max_tokens=HF_MAX_TOKENS,
timeout=HF_TIMEOUT, # ⬅️ timeout augmenté
top_p=0.95,
)
agent = CodeAgent(
tools=[ValidateEANTool(), OFFByEAN(), RegexCOICOP(), OFFtoCOICOP(), SemSim(),
WebSearch(), WebGet(),
MergeCandidatesTool(), Resolve()],
model=model,
add_base_tools=False,
max_steps=AGENT_MAX_STEPS, # ⬅️ moins d’étapes = moins de tokens/latence
verbosity_level=1, # ⬅️ logs plus courts = moins de tokens sortants
)
return agent
# ---- run task with fallback ----
def run_task_with_fallback(task: str):
errors = []
for mid in [m for m in FALLBACK_MODELS if m]:
try:
agent = build_agent(mid)
return agent.run(task)
except Exception as e:
errors.append(f"{mid}: {type(e).__name__}: {e}")
# on tente le modèle suivant
continue
# Si TOUT a échoué, renvoyer un JSON propre plutôt qu’un crash
return {
"final": None,
"alternatives": [],
"candidates_top": [],
"explanation": "LLM backend indisponible (timeouts).",
"errors": errors,
}
def parse_result(res):
if isinstance(res, dict): return res
try: return ast.literal_eval(res)
except Exception: return {"raw": res}
if __name__ == "__main__":
ean = "3256221112345" # EAN fictif
label = "Les p'tits crémeux – Aldi – 216 g"
agent = build_agent()
task = f"""\
Classe ce produit en COICOP:
EAN: {ean}
Libellé: {label}
Outils autorisés :
- validate_ean
- openfoodfacts_product_by_ean
- map_off_to_coicop
- coicop_regex_rules
- coicop_semantic_similarity
- merge_candidates
- resolve_coicop_candidates
- python_interpreter # UNIQUEMENT pour lignes simples d’assignation ou d’appel d’outil
Règles STRICTES d’écriture de code :
- Aucune structure de contrôle Python : pas de if, else, for, while, try, with, def, class.
- Aucun print, aucun logging, aucune concaténation multi-ligne.
- Chaque bloc de code contient une seule instruction Python, sur une seule ligne.
- Commencer par définir deux variables :
1) EAN_STR = "{ean}"
2) LBL = \"\"\"{label}\"\"\"
- Pour tous les outils qui prennent le libellé, utiliser LBL.
- La fonction validate_ean renvoie un dictionnaire avec les clés 'valid' et 'normalized'. Ne pas la traiter comme un booléen directement.
Règles STRICTES de sortie :
- Terminer par un unique objet JSON valide en appelant final_answer avec cet objet.
- Ne pas ajouter de texte en dehors de l’objet JSON final.
- Ne pas utiliser de backticks.
- Le JSON final doit contenir les clés : final, alternatives, candidates_top, explanation.
Branchements (décision prise sans écrire de if en code) :
- MODE AVEC EAN si EAN_STR n’est pas "N/A" ET si validate_ean(EAN_STR) renvoie valid = True ET si l’appel OpenFoodFacts renvoie ok = True.
- Sinon, MODE SANS EAN.
Pipeline — MODE AVEC EAN :
1) v = validate_ean(EAN_STR)
2) off = openfoodfacts_product_by_ean(EAN_STR)
3) offmap = map_off_to_coicop(off_payload=off)
4) rx = coicop_regex_rules(text=LBL)
5) sem = coicop_semantic_similarity(text=LBL, topk=5)
6) merged = merge_candidates(candidates_lists=[offmap, rx, sem], min_k=3, fallback_bias="cheese")
7) res = resolve_coicop_candidates(json_lists=[merged], topn=3)
→ Appeler immédiatement final_answer avec res.
Pipeline — MODE SANS EAN :
1) rx = coicop_regex_rules(text=LBL)
2) sem = coicop_semantic_similarity(text=LBL, topk=5)
3) merged = merge_candidates(candidates_lists=[rx, sem], min_k=3, fallback_bias="cheese")
4) res = resolve_coicop_candidates(json_lists=[merged], topn=3)
→ Appeler immédiatement final_answer avec res.
Contraintes d’usage :
- Utiliser python_interpreter uniquement pour des lignes uniques d’assignation ou d’appel d’outil (ex: var = tool(args) ou tool(args)).
- Ne créer aucun fichier et ne faire aucune entrée/sortie externe.
"""
# out = agent.run(task)
out = run_task_with_fallback(task)
print(parse_result(out))