cv_parser / src /parser_flow /CV_agent_flow.py
quentinL52
update
1556508
"""
Orchestrateur CV enrichi avec 3 phases :
Phase 1 : DΓ©coupage du CV en sections (cv_splitter)
Phase 2 : Extraction parallèle (8 agents)
Phase 3a : Analyse d'en-tΓͺte (run_header_analysis) β€” tourne en // avec Phase 2
Phase 3b : Analyse & Recommandation — 3 agents en parallèle après Phase 2 + 3a
Flux optimisΓ© : Phase 1 β†’ (Phase 2 // Phase 3a) β†’ Phase 3b
Produit un JSON en 2 parties : candidat + recommandations.
"""
import json
import logging
import os
import yaml
import asyncio
from datetime import datetime
from typing import Dict, Any, List
from crewai import Agent, Task, Crew, Process
from src.config.app_config import get_small_llm, get_big_llm
logger = logging.getLogger(__name__)
#_____________________________________________________________________________________
class CVAgentOrchestrator:
"""Orchestrateur multi-agents pour le parsing et l'analyse de CV."""
def __init__(self):
self.llm = get_small_llm()
self.big_llm = get_big_llm()
self.agents_config = self._load_yaml("agents.yaml")
self.tasks_config = self._load_yaml("tasks.yaml")
self.metiers_data = self._load_metiers()
self._create_agents()
def _load_yaml(self, filename: str) -> Dict:
base_path = os.path.dirname(os.path.dirname(__file__))
config_path = os.path.join(base_path, "config", filename)
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def _load_metiers(self) -> List[Dict]:
"""Charge le rΓ©fΓ©rentiel de mΓ©tiers (avec embeddings)."""
base_path = os.path.dirname(os.path.dirname(__file__))
metiers_path = os.path.join(base_path, "data", "metiers.json")
with open(metiers_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("metiers", [])
def _create_agents(self):
def make_agent(name, llm_override=None):
return Agent(
config=self.agents_config[name],
llm=llm_override or self.llm,
allow_delegation=False,
verbose=True,
max_iter=1,
respect_context_window=True,
)
self.cv_splitter = make_agent("cv_splitter")
self.skills_extractor = make_agent("skills_extractor")
self.experience_extractor = make_agent("experience_extractor")
self.project_extractor = make_agent("project_extractor")
self.education_extractor = make_agent("education_extractor")
self.reconversion_detector = make_agent("reconversion_detector")
self.language_extractor = make_agent("language_extractor")
self.etudiant_detector = make_agent("etudiant_detector")
self.identity_extractor = make_agent("identity_extractor")
self.header_analyzer = make_agent("header_analyzer")
self.metier_matcher = make_agent("metier_matcher")
self.cv_quality_checker = make_agent("cv_quality_checker", llm_override=self.big_llm)
self.project_analyzer = make_agent("project_analyzer", llm_override=self.big_llm)
# ──────────────────────────────────────────────
# PHASE 1 : DΓ©coupage du CV en sections
# ──────────────────────────────────────────────
async def split_cv_sections(self, cv_content: str, cv_raw_start: str = "") -> Dict[str, str]:
"""DΓ©coupe le CV en sections via l'agent cv_splitter."""
task_config = self.tasks_config["split_cv_task"].copy()
# Γ‰chapper les accolades dans le contenu CV pour Γ©viter les erreurs de format
safe_content = cv_content[:20000].replace("{", "{{").replace("}", "}}")
safe_raw = cv_raw_start[:2000].replace("{", "{{").replace("}", "}}")
task_config["description"] = task_config["description"].format(
cv_content=safe_content,
cv_raw_start=safe_raw,
)
task = Task(config=task_config, agent=self.cv_splitter)
crew = Crew(
agents=[self.cv_splitter],
tasks=[task],
process=Process.sequential,
verbose=False,
)
result = await crew.kickoff_async()
parsed = self._parse_json_output(result, default_structure={})
return parsed
# ──────────────────────────────────────────────
# PHASE 2 : Extraction et Analyse Parallèles
# ──────────────────────────────────────────────
async def run_all_agents(
self, sections: Dict[str, str], cv_raw_start: str = "", cv_full_text: str = "", file_name: str = "", page_count: int = 1
) -> Dict[str, Any]:
"""Exécute toutes les tÒches d'extraction et d'analyse en parallèle."""
raw_header = sections.get("header", "")
raw_experiences = sections.get("experiences", "")
raw_projects = sections.get("projects", "")
raw_skills = sections.get("skills", "")
raw_education = sections.get("education", "")
raw_languages = sections.get("languages", "")
safe_cv_raw = cv_raw_start[:2000].replace("{", "{{").replace("}", "}}")
safe_header = raw_header.replace("{", "{{").replace("}", "}}")
from src.services.metier_pre_filter import get_top_k_metiers
top_metiers = get_top_k_metiers(
metiers_data=self.metiers_data,
experiences_summary=raw_experiences[:2000],
projects_summary=raw_projects[:2000],
hard_skills=raw_skills[:2000],
soft_skills="",
k=3
)
metiers_reference = self._prepare_metiers_for_prompt(top_metiers)
def create_task_async(task_key, agent, **kwargs):
t_config = self.tasks_config[task_key].copy()
t_description = t_config["description"]
try:
t_config["description"] = t_description.format(**kwargs)
except KeyError as e:
logger.warning(f"KeyError formatting task '{task_key}': {e}. Falling back to manual replace.")
desc = t_description
for k, v in kwargs.items():
placeholder = "{" + k + "}"
if placeholder in desc:
desc = desc.replace(placeholder, str(v))
t_config["description"] = desc
except Exception as e:
logger.error(f"Unexpected error formatting task '{task_key}': {e}")
task = Task(config=t_config, agent=agent)
c = Crew(agents=[agent], tasks=[task], verbose=False)
return (task_key, c.kickoff_async())
tasks_def = [
("skills_task", self.skills_extractor, {"experiences": raw_experiences, "projects": raw_projects, "skills": raw_skills, "education": raw_education}),
("experience_task", self.experience_extractor, {"experiences": raw_experiences}),
("project_task", self.project_extractor, {"projects": raw_projects}),
("education_task", self.education_extractor, {"education": raw_education}),
("reconversion_task", self.reconversion_detector, {"experiences": raw_experiences, "education": raw_education}),
("language_task", self.language_extractor, {"languages": raw_languages, "cv_raw_start": cv_raw_start[:500]}),
("etudiant_task", self.etudiant_detector, {"education": raw_education, "current_date": datetime.now().strftime("%Y-%m-%d")}),
("identity_task", self.identity_extractor, {"header": raw_header, "cv_raw_start": cv_raw_start[:1500], "file_name": file_name}),
("poste_visΓ©_task", self.header_analyzer, {"header": safe_header, "cv_raw_start": safe_cv_raw}),
("cv_quality_task", self.cv_quality_checker, {
"header": safe_header,
"page_count": page_count,
"cv_full_text": cv_full_text[:6000],
"cv_raw_start": safe_cv_raw,
"skills": raw_skills[:2000],
"experiences": raw_experiences[:3000],
"projects": raw_projects[:2000],
"education": raw_education[:2000],
}),
("metier_matching_task", self.metier_matcher, {
"header": safe_header,
"skills": raw_skills[:2000],
"experiences": raw_experiences[:3000],
"projects": raw_projects[:2000],
"education": raw_education[:2000],
"metiers_reference": metiers_reference,
}),
("project_analysis_task", self.project_analyzer, {
"header": safe_header,
"projects": raw_projects[:3000],
}),
]
task_coroutines = [create_task_async(key, agent, **kwargs) for key, agent, kwargs in tasks_def]
keys = [t[0] for t in task_coroutines]
coroutines = [t[1] for t in task_coroutines]
results_list = await asyncio.gather(*coroutines, return_exceptions=True)
results_map = {}
for key, result in zip(keys, results_list):
if isinstance(result, Exception):
logger.error(f"Task '{key}' failed: {result}")
else:
results_map[key] = result
return self._build_final_json(results_map)
def _build_final_json(self, results_map: Dict[str, Any]) -> Dict[str, Any]:
"""Agrège les résultats de toutes les tÒches en un JSON final."""
def get_parsed(key, default=None):
if key not in results_map:
return default
return self._parse_json_output(results_map[key], default)
# Extraction
competences = get_parsed("skills_task", {"hard_skills": [], "soft_skills": []})
experiences = get_parsed("experience_task", [])
projets = get_parsed("project_task", {"professional": [], "personal": []})
formations = get_parsed("education_task", [])
reconversion = get_parsed("reconversion_task", {}).get("reconversion_analysis", {})
etudiant_data = get_parsed("etudiant_task", {}).get("etudiant_analysis", {})
latest_end_date = etudiant_data.get("latest_education_end_date")
if latest_end_date:
etudiant_data["is_etudiant"] = self._is_ongoing_date(latest_end_date)
is_en_poste = False
if isinstance(experiences, list):
for exp in experiences:
end_date = exp.get("end_date")
if isinstance(exp, dict) and end_date:
if self._is_ongoing_date(end_date):
is_en_poste = True
break
langues_raw = get_parsed("language_task", {})
identity = get_parsed("identity_task", {})
# Nettoyage des doublons dans hard_skills (case-insensitive)
if isinstance(competences, dict):
raw_skills = competences.get("hard_skills", [])
seen = set()
unique_skills = []
for skill in raw_skills:
key = str(skill).lower() if not isinstance(skill, str) else skill.lower()
if key not in seen:
seen.add(key)
unique_skills.append(skill)
competences["hard_skills"] = unique_skills
candidat = {
"first_name": identity.get("first_name") if isinstance(identity, dict) else None,
"langues": langues_raw.get("langues", []) if isinstance(langues_raw, dict) else [],
"compΓ©tences": competences,
"expΓ©riences": experiences,
"reconversion": reconversion,
"projets": projets,
"formations": formations,
"etudiant": etudiant_data,
"is_en_poste": is_en_poste,
}
# Analyse
header_data = get_parsed("poste_visΓ©_task", {"poste_vise": "Non identifiΓ©", "confiance": 0})
metier_data = get_parsed("metier_matching_task", {"postes_recommandes": []})
quality_data = get_parsed("cv_quality_task", {"score_global": 0, "red_flags": [], "conseils_prioritaires": []})
project_data = get_parsed("project_analysis_task", {"analyse_projets": []})
conseils = []
if isinstance(quality_data, dict):
conseils.extend(quality_data.get("conseils_prioritaires", []))
# Filtre de sΓ©curitΓ© : ne garder dans l'analyse de projets que ceux issus de l'extraction
extracted_titles: set[str] = set()
for p in (projets.get("professional", []) if isinstance(projets, dict) else []):
if isinstance(p, dict) and p.get("title"):
extracted_titles.add(p["title"].strip().lower())
for p in (projets.get("personal", []) if isinstance(projets, dict) else []):
if isinstance(p, dict) and p.get("title"):
extracted_titles.add(p["title"].strip().lower())
analyse_projets = project_data.get("analyse_projets", []) if isinstance(project_data, dict) else []
if extracted_titles and isinstance(analyse_projets, list):
def _is_extracted_project(titre: str) -> bool:
t = titre.strip().lower()
return t in extracted_titles or any(t in ref or ref in t for ref in extracted_titles)
analyse_projets = [p for p in analyse_projets if isinstance(p, dict) and _is_extracted_project(p.get("titre", ""))]
recommandations = {
"header_analysis": header_data,
"postes_recommandes": metier_data.get("postes_recommandes", []) if isinstance(metier_data, dict) else [],
"analyse_poste_vise": metier_data.get("analyse_poste_vise", "") if isinstance(metier_data, dict) else "",
"qualite_cv": quality_data,
"analyse_projets": analyse_projets,
"coherence_globale_projets": project_data.get("coherence_globale", {}) if isinstance(project_data, dict) else {},
"conseils_amelioration": conseils,
}
return {
"candidat": candidat,
"recommandations": recommandations
}
def _prepare_metiers_for_prompt(self, metiers: List[Dict] = None) -> str:
"""PrΓ©pare le rΓ©fΓ©rentiel mΓ©tiers restreint pour le prompt."""
if metiers is None:
metiers = self.metiers_data
flat_list = []
def _flatten(job_list):
for job in job_list:
if "metiers" in job:
_flatten(job["metiers"])
elif "id" in job:
flat_list.append(job)
_flatten(metiers)
lines = []
for m in flat_list:
mid = m.get("id", "?")
nom = m.get("nom", "?")
cat = m.get("categorie", "?")
comp = m.get("competences_techniques", [])
outils = m.get("outils_technologies", [])
soft = m.get("competences_soft", [])
niveau = m.get("niveau_etude", "?")
exp = m.get("experience_requise", "?")
lines.append(
f"[{mid}] {nom} ({cat})\n"
f" CompΓ©tences techniques: {', '.join(comp)}\n"
f" Outils: {', '.join(outils)}\n"
f" Soft skills: {', '.join(soft[:3])}\n"
f" Niveau: {niveau} | ExpΓ©rience: {exp}"
)
return "\n\n".join(lines)
# ──────────────────────────────────────────────
# Utilitaires
# ──────────────────────────────────────────────
def _is_ongoing_date(self, date_str: str) -> bool:
"""DΓ©termine si une date (fin d'Γ©tude ou fin d'expΓ©rience) est dans le futur ou en cours."""
if not date_str:
return False
date_str = str(date_str).lower().strip()
ongoing_keywords = [
"present", "prΓ©sent", "current", "cours", "aujourd'hui", "now"
]
if any(keyword in date_str for keyword in ongoing_keywords):
return True
try:
now = datetime.now()
end_date = None
if len(date_str) == 10 and date_str[4] == "-" and date_str[7] == "-":
end_date = datetime.strptime(date_str, "%Y-%m-%d")
elif len(date_str) == 7 and date_str[4] == "-":
end_date = datetime.strptime(date_str, "%Y-%m")
elif "/" in date_str:
parts = date_str.split("/")
if len(parts) == 2:
_, y = parts
if len(y) == 4:
end_date = datetime.strptime(date_str, "%m/%Y")
elif len(y) == 2:
end_date = datetime.strptime(date_str, "%m/%y")
elif len(date_str) == 4 and date_str.isdigit():
end_date = datetime.strptime(date_str, "%Y")
end_date = end_date.replace(month=12, day=31)
if end_date:
return end_date >= now
return False
except (ValueError, IndexError):
logger.warning(f"Date parsing failed for: {date_str}")
return False
def _parse_json_output(self, crew_output, default_structure=None) -> Any:
"""Parse la sortie JSON d'un agent CrewAI avec nettoyage robuste."""
if crew_output is None:
return default_structure if default_structure is not None else {}
raw = crew_output.raw if hasattr(crew_output, "raw") else str(crew_output)
# Extraire le bloc JSON si encapsulΓ© dans des backticks
if "```json" in raw:
raw = raw.split("```json")[1].split("```")[0].strip()
elif "```" in raw:
parts = raw.split("```")
if len(parts) >= 3:
raw = parts[1].strip()
raw = raw.strip().lstrip("\ufeff")
def _try_parse(text: str):
"""Tente un parse direct puis un parse avec extraction du premier bloc JSON."""
try:
return json.loads(text)
except json.JSONDecodeError:
pass
for start_char, end_char in [("{", "}"), ("[", "]")]:
start_idx = text.find(start_char)
end_idx = text.rfind(end_char)
if start_idx != -1 and end_idx > start_idx:
try:
return json.loads(text[start_idx : end_idx + 1])
except json.JSONDecodeError:
pass
return None
result = _try_parse(raw)
if result is not None:
return result
if "{{" in raw:
cleaned = raw.replace("{{", "{").replace("}}", "}")
result = _try_parse(cleaned)
if result is not None:
return result
logger.error(f"JSON Parse Error (after cleanup): {raw[:200]}")
return default_structure if default_structure is not None else {}