GAIA26CCPA / src /data_loader.py
JosephMcDonnell's picture
use glfi ecoalim (#17)
e2f537d
"""
data_loader.py - Chargement et indexation des bases de données EcoALIM, GFLI et PDF CIR.
"""
from __future__ import annotations
import logging
import json
import re
from functools import lru_cache
from typing import Dict, List, Optional, Tuple
from datasets import load_dataset,DownloadMode
import pandas as pd
import pdfplumber
import config
# ============================================================================
# EcoALIM
# ============================================================================
@lru_cache(maxsize=1)
def load_ecoalim() -> pd.DataFrame:
"""Charge la base EcoALIM (feuille FR) et renvoie un DataFrame nettoyé."""
def get_ecoalim_df() -> pd.DataFrame:
if config.IS_PRODUCTION:
print("#############")
ecoalim = load_dataset("CCPA-GAIA/ECOALIM",data_files="ecoalim.csv", token=config.HF_KEY,download_mode=DownloadMode.FORCE_REDOWNLOAD)
return ecoalim["train"].to_pandas()
return pd.read_excel(
config.ECOALIM_PATH,
sheet_name=config.ECOALIM_SHEET,
header=config.ECOALIM_HEADER_ROW,
)
df = get_ecoalim_df()
# Supprimer les lignes entièrement vides
df = df.dropna(subset=[config.ECOALIM_COL_NOM]).reset_index(drop=True)
# Normaliser les colonnes pays en minuscules pour faciliter la recherche
for col in [config.ECOALIM_COL_PAYS_PROD, config.ECOALIM_COL_PAYS_TRANSFO]:
if col in df.columns:
df[col] = df[col].astype(str).str.strip().str.lower()
return df
def _normalize_for_search(text: str) -> str:
"""Normalise un texte pour la recherche (accents, casse, ponctuation)."""
import unicodedata
text = text.lower().strip()
# Normalize unicode accents
nfkd = unicodedata.normalize('NFKD', text)
ascii_text = ''.join(c for c in nfkd if not unicodedata.combining(c))
return ascii_text
_STOPWORDS_FR = {
"de", "du", "des", "la", "le", "les", "d", "l", "a", "au", "aux"
}
def _tokens_for_search(text: str) -> list[str]:
"""Découpe un texte en tokens utiles pour une recherche souple."""
text = _normalize_for_search(text)
tokens = re.findall(r"[a-z0-9]+", text)
return [t for t in tokens if t and t not in _STOPWORDS_FR]
def is_name_match(matiere: str, intrant_name: str) -> bool:
"""
Vérifie si le nom de la matière est une correspondance réelle (mot entier)
dans le nom de l'intrant, et non un simple sous-chaîne accidentelle.
Ex : "blé" ne matche PAS "blend", mais matche "Blé tendre".
"""
mat_norm = _normalize_for_search(matiere)
int_norm = _normalize_for_search(intrant_name)
if mat_norm == int_norm:
return True
# Le mot de la matière doit apparaître comme mot entier dans l'intrant
pattern = r'\b' + re.escape(mat_norm) + r'\b'
return bool(re.search(pattern, int_norm))
def search_ecoalim(
matiere: str,
pays_production: Optional[str] = None,
pays_transformation: Optional[str] = None,
) -> pd.DataFrame:
"""
Cherche dans EcoALIM les lignes correspondant à une matière première.
Utilise une recherche intelligente avec priorisation :
1. Nom commence par la matière
2. Mot entier trouvé dans le nom
3. Contient la matière (substring)
Retourne un DataFrame filtré et trié par pertinence (peut être vide).
"""
df = load_ecoalim()
matiere_norm = _normalize_for_search(matiere)
# Build normalized column for search
nom_col = config.ECOALIM_COL_NOM
df_norms = df[nom_col].apply(lambda x: _normalize_for_search(str(x)) if pd.notna(x) else "")
# Create priority masks
mask_starts = df_norms.str.startswith(matiere_norm, na=False)
pattern_word = r'\b' + re.escape(matiere_norm) + r'\b'
mask_word = df_norms.str.contains(pattern_word, na=False, regex=True)
tokens = _tokens_for_search(matiere_norm)
mask_tokens = pd.Series(False, index=df.index)
if tokens:
mask_tokens = df_norms.apply(
lambda x: all(t in _tokens_for_search(x) for t in tokens)
)
mask_contains = df_norms.str.contains(re.escape(matiere_norm), na=False)
# Use best available mask with priority
if mask_starts.any():
mask = mask_starts
elif mask_word.any():
mask = mask_word
elif mask_tokens.any():
mask = mask_tokens
elif mask_contains.any():
mask = mask_contains
else:
return pd.DataFrame(columns=df.columns)
if pays_production:
pays_prod_low = pays_production.lower().strip()
mask_pays = df[config.ECOALIM_COL_PAYS_PROD].str.contains(
re.escape(pays_prod_low), na=False
)
combined = mask & mask_pays
if combined.any():
mask = combined
if pays_transformation:
pays_transfo_low = pays_transformation.lower().strip()
mask_transfo = df[config.ECOALIM_COL_PAYS_TRANSFO].str.contains(
re.escape(pays_transfo_low), na=False
)
combined = mask & mask_transfo
if combined.any():
mask = combined
result = df[mask].copy()
# Sort by relevance: entries starting with the search term come first
if not result.empty:
result_norms = result[nom_col].apply(lambda x: _normalize_for_search(str(x)))
result["_priority"] = 3
result.loc[result_norms.str.contains(pattern_word, na=False, regex=True), "_priority"] = 1
result.loc[result_norms.str.startswith(matiere_norm, na=False), "_priority"] = 0
result.loc[result_norms.apply(lambda x: all(t in _tokens_for_search(x) for t in tokens)), "_priority"] = 2
# Prefer OS outputs over champ when ties exist
result["_os_priority"] = 1
result.loc[result_norms.str.contains("sortie os", na=False), "_os_priority"] = 0
result = result.sort_values(["_priority", "_os_priority"]).drop(columns=["_priority", "_os_priority"])
return result
def get_ecoalim_climate_value(
matiere: str,
pays_production: Optional[str] = None,
pays_transformation: Optional[str] = None,
) -> Optional[Tuple[float, str, str]]:
"""
Retourne (valeur_kg_co2_eq, nom_intrant, source_info) ou None.
Unité EcoALIM : kg CO2 eq / kg de produit.
"""
results = search_ecoalim(matiere, pays_production, pays_transformation)
if results.empty:
return None
# Prendre la première correspondance (ou la plus défavorable si demandé)
row = results.iloc[0]
val = row.get(config.ECOALIM_COL_CLIMATE)
if pd.isna(val):
return None
nom = row.get(config.ECOALIM_COL_NOM, matiere)
return (float(val), str(nom), "ECOALIM")
def get_ecoalim_worst_value(matiere: str) -> Optional[Tuple[float, str, str]]:
"""Retourne la valeur la plus défavorable (max) pour cette matière dans EcoALIM."""
results = search_ecoalim(matiere)
if results.empty:
return None
climate_col = config.ECOALIM_COL_CLIMATE
results_valid = results.dropna(subset=[climate_col])
if results_valid.empty:
return None
idx = results_valid[climate_col].idxmax()
row = results_valid.loc[idx]
return (float(row[climate_col]), str(row[config.ECOALIM_COL_NOM]), "ECOALIM (valeur la plus défavorable)")
# ============================================================================
# GFLI
# ============================================================================
@lru_cache(maxsize=1)
def load_gfli() -> pd.DataFrame:
"""Charge la base GFLI (Economic allocation EF3.1)."""
def get_glfi_df() -> pd.DataFrame:
if config.IS_PRODUCTION:
glfi_dataset = load_dataset("CCPA-GAIA/ECOALIM",data_files="glfi.csv", token=config.HF_KEY,download_mode=DownloadMode.FORCE_REDOWNLOAD)
return glfi_dataset["train"].to_pandas()
return pd.read_excel(
config.GFLI_PATH,
sheet_name=config.GFLI_SHEET,
)
df = get_glfi_df()
df = df.dropna(subset=[config.GFLI_COL_PRODUCT]).reset_index(drop=True)
return df
def _extract_gfli_country(product_name: str) -> Optional[str]:
"""Extrait le code pays ISO d'un nom de produit GFLI (ex: '.../FR Economic S' -> 'FR')."""
m = re.search(r"/([A-Z]{2,3})\s+Economic\s+S", product_name)
return m.group(1) if m else None
def _extract_gfli_base_name(product_name: str) -> str:
"""Extrait le nom de base du produit GFLI (sans le code pays)."""
m = re.match(r"(.+)/[A-Z]{2,3}\s+Economic\s+S", product_name)
return m.group(1).strip() if m else product_name.strip()
def search_gfli(
matiere: str,
country_iso: Optional[str] = None,
) -> pd.DataFrame:
"""
Recherche dans GFLI par nom de matière (en anglais) et optionnellement par pays ISO.
Uses word-boundary matching for better precision.
"""
logging.info(f"Searching GLFI with args matiere: {matiere}, country_iso: {country_iso}")
df = load_gfli()
matiere_norm = _normalize_for_search(matiere)
prod_col = config.GFLI_COL_PRODUCT
df_norms = df[prod_col].apply(lambda x: _normalize_for_search(str(x)) if pd.notna(x) else "")
# Strategy 1: starts-with
mask = df_norms.str.startswith(matiere_norm, na=False)
# Strategy 2: word-boundary match
if not mask.any():
pattern_word = r'\b' + re.escape(matiere_norm) + r'\b'
mask = df_norms.str.contains(pattern_word, na=False, regex=True)
# Strategy 3: token-subset match (souple)
if not mask.any():
tokens = _tokens_for_search(matiere_norm)
if tokens:
mask = df_norms.apply(lambda x: all(t in _tokens_for_search(x) for t in tokens))
# Strategy 4: contains
if not mask.any():
mask = df_norms.str.contains(re.escape(matiere_norm), na=False)
if country_iso:
country_upper = country_iso.upper().strip()
mask_country = df[prod_col].str.contains(
rf"/{re.escape(country_upper)}\s+Economic\s+S", na=False, regex=True
)
# Filtrage strict : si un pays est demandé, ne retourner QUE les résultats de ce pays
mask = mask & mask_country
logging.info("Masked df: %s", df[mask].head())
return df[mask].copy()
def get_gfli_climate_value(
matiere: str,
country_iso: Optional[str] = None,
) -> Optional[Tuple[float, str, str]]:
"""
Retourne (valeur_kg_co2_eq_par_tonne, nom_produit, source_info) ou None.
Unité GFLI : kg CO2 eq / tonne de produit.
"""
results = search_gfli(matiere, country_iso)
if results.empty:
return None
row = results.iloc[0]
val = row.get(config.GFLI_COL_CLIMATE)
if pd.isna(val):
return None
nom = row.get(config.GFLI_COL_PRODUCT, matiere)
return (float(val), str(nom), "GFLI")
def get_gfli_worst_value(matiere: str) -> Optional[Tuple[float, str, str]]:
"""Retourne la valeur la plus défavorable (max) pour cette matière dans GFLI."""
results = search_gfli(matiere)
if results.empty:
return None
climate_col = config.GFLI_COL_CLIMATE
results_valid = results.dropna(subset=[climate_col])
if results_valid.empty:
return None
idx = results_valid[climate_col].idxmax()
row = results_valid.loc[idx]
return (float(row[climate_col]), str(row[config.GFLI_COL_PRODUCT]), "GFLI (valeur la plus défavorable)")
def get_gfli_rer_value(matiere: str) -> Optional[Tuple[float, str, str]]:
"""Retourne la valeur Mix Européen (RER) dans GFLI."""
return get_gfli_climate_value(matiere, "RER")
def get_gfli_glo_value(matiere: str) -> Optional[Tuple[float, str, str]]:
"""Retourne la valeur Mix Monde (GLO) dans GFLI."""
return get_gfli_climate_value(matiere, "GLO")
# ============================================================================
# GFLI - Listes utilitaires
# ============================================================================
def get_gfli_base_products() -> List[str]:
"""Retourne la liste des noms de base de produits uniques dans GFLI."""
df = load_gfli()
products = df[config.GFLI_COL_PRODUCT].dropna().unique()
base_names = set()
for p in products:
base_names.add(_extract_gfli_base_name(str(p)))
return sorted(base_names)
def get_ecoalim_matieres() -> List[str]:
"""Retourne la liste des matières premières uniques dans EcoALIM."""
df = load_ecoalim()
return sorted(df[config.ECOALIM_COL_NOM].dropna().unique().tolist())
# ============================================================================
# Fonctions multi-candidats (pour affichage comparatif)
# ============================================================================
def get_top_ecoalim_candidates(
matiere: str,
pays_production: Optional[str] = None,
pays_transformation: Optional[str] = None,
top_n: Optional[int] = 8,
) -> List[Dict]:
"""
Retourne les top N correspondances EcoALIM triées par pertinence,
chacune avec nom + valeur impact.
"""
results = search_ecoalim(matiere, pays_production, pays_transformation)
if results.empty:
return []
candidates = []
rows = results if top_n is None else results.head(top_n)
for _, row in rows.iterrows():
val = row.get(config.ECOALIM_COL_CLIMATE)
if pd.notna(val):
candidates.append({
"nom": str(row[config.ECOALIM_COL_NOM]),
"impact": float(val),
"unite": "kg CO2 eq / kg",
"source": "ECOALIM",
})
return candidates
def get_top_gfli_candidates(
matiere: str,
country_iso: Optional[str] = None,
top_n: Optional[int] = 8,
) -> List[Dict]:
"""
Retourne les top N correspondances GFLI triées par pertinence,
chacune avec nom + valeur impact.
"""
results = search_gfli(matiere, country_iso)
if results.empty:
return []
candidates = []
rows = results if top_n is None else results.head(top_n)
for _, row in rows.iterrows():
val = row.get(config.GFLI_COL_CLIMATE)
if pd.notna(val):
candidates.append({
"nom": str(row[config.GFLI_COL_PRODUCT]),
"impact": float(val),
"unite": "kg CO2 eq / tonne",
"source": "GFLI",
})
return candidates
# ============================================================================
# PDF CIR - Catalogue des Matières Premières
# ============================================================================
@lru_cache(maxsize=1)
def load_pdf_text() -> str:
"""Charge et retourne le texte complet du PDF CIR."""
full_text = []
if config.IS_PRODUCTION:
dataset = load_dataset(
"CCPA-GAIA/ECOALIM",
data_files=config.PDF_CIR_PATH.split("/")[-1],
token=config.HF_KEY
)
pdf = dataset["train"][0]["pdf"]
# Assuming this pdf object behaves like pdfplumber
for page in pdf.pages:
text = page.extract_text()
if text:
full_text.append(text)
else:
# Keep everything inside `with`
with pdfplumber.open(config.PDF_CIR_PATH) as pdf:
for page in pdf.pages:
text = page.extract_text()
if text:
full_text.append(text)
return "\n\n".join(full_text)
def get_pdf_excerpt(max_chars: int = 15000) -> str:
"""Retourne un extrait du PDF CIR (tronqué si nécessaire) pour envoi au LLM."""
text = load_pdf_text()
if len(text) > max_chars:
return text[:max_chars] + "\n... [texte tronqué]"
return text
# ============================================================================
# Logigramme
# ============================================================================
@lru_cache(maxsize=1)
def load_logigramme() -> dict:
"""Charge le logigramme JSON."""
with open(config.LOGIGRAMME_PATH, "r", encoding="utf-8") as f:
return json.load(f)