| | """ |
| | Agent Summarizer - Analyse et résumé de documents. |
| | Crée des résumés structurés et des analyses approfondies des documents extraits. |
| | """ |
| |
|
| | import asyncio |
| | from typing import List, Dict, Any, Optional |
| | from datetime import datetime |
| | import hashlib |
| |
|
| | from src.agents.base_agent import BaseAgent |
| | from src.models.document_models import Document, DocumentSummary, SummarizationOutput, KeyPoint, Citation |
| | from src.models.state_models import AgentType |
| | from src.services.llm_service import LLMManager, LLMError |
| | from src.services.text_chunking import ChunkingManager, TextChunk |
| | from src.core.logging import setup_logger |
| | from config.prompts import SUMMARIZER_PROMPTS, SYSTEM_PROMPTS |
| | import hashlib |
| | import re |
| |
|
| |
|
| | class SummarizationInput: |
| | """Input pour l'agent Summarizer.""" |
| | |
| | def __init__( |
| | self, |
| | documents: List[Document], |
| | summary_options: Optional[Dict[str, Any]] = None |
| | ): |
| | self.documents = documents |
| | self.summary_options = summary_options or {} |
| | |
| | |
| | self.include_sentiment = self.summary_options.get('include_sentiment', True) |
| | self.include_citations = self.summary_options.get('include_citations', True) |
| | self.max_key_points = self.summary_options.get('max_key_points', 5) |
| | self.detailed_analysis = self.summary_options.get('detailed_analysis', True) |
| | self.chunk_large_docs = self.summary_options.get('chunk_large_docs', True) |
| | self.max_doc_size = self.summary_options.get('max_doc_size', 8000) |
| |
|
| |
|
| | class SummarizerAgent(BaseAgent): |
| | """ |
| | Agent responsable de l'analyse et du résumé de documents. |
| | |
| | Fonctionnalités: |
| | - Résumé exécutif et détaillé |
| | - Extraction de points clés et arguments |
| | - Analyse de sentiment et biais |
| | - Gestion des documents longs via chunking |
| | - Citations et statistiques importantes |
| | - Évaluation de crédibilité |
| | """ |
| | |
| | def __init__( |
| | self, |
| | max_retries: int = 2, |
| | timeout: float = 600.0 |
| | ): |
| | super().__init__( |
| | agent_type=AgentType.READER, |
| | name="summarizer", |
| | max_retries=max_retries, |
| | timeout=timeout |
| | ) |
| | |
| | |
| | self.llm_manager = LLMManager() |
| | self.chunking_manager = ChunkingManager() |
| | |
| | |
| | self.max_concurrent_summaries = 3 |
| | self.chunk_overlap_threshold = 6000 |
| | |
| | def validate_input(self, input_data: SummarizationInput) -> bool: |
| | """ |
| | Valide les données d'entrée pour la summarization. |
| | |
| | Args: |
| | input_data: Input contenant les documents à résumer |
| | |
| | Returns: |
| | True si les données sont valides |
| | """ |
| | if not input_data.documents: |
| | self.logger.error("Aucun document fourni pour la summarization") |
| | return False |
| | |
| | if len(input_data.documents) > 20: |
| | self.logger.error(f"Trop de documents ({len(input_data.documents)}), maximum 20") |
| | return False |
| | |
| | |
| | valid_docs = [doc for doc in input_data.documents if doc.content and doc.content.strip()] |
| | if not valid_docs: |
| | self.logger.error("Aucun document avec contenu valide") |
| | return False |
| | |
| | return True |
| | |
| | async def process(self, input_data: SummarizationInput) -> SummarizationOutput: |
| | """ |
| | Traite la summarization des documents. |
| | |
| | Args: |
| | input_data: Input contenant les documents à résumer |
| | |
| | Returns: |
| | SummarizationOutput avec tous les résumés |
| | """ |
| | start_time = datetime.now() |
| | self.logger.info(f"Début summarization de {len(input_data.documents)} documents") |
| | |
| | |
| | valid_documents = [doc for doc in input_data.documents if doc.content and doc.content.strip()] |
| | self.logger.info(f"Documents valides à traiter: {len(valid_documents)}") |
| | |
| | try: |
| | |
| | summaries = await self._summarize_all_documents(valid_documents, input_data) |
| | |
| | |
| | global_analysis = await self._perform_global_analysis(summaries) |
| | |
| | |
| | total_processing_time = (datetime.now() - start_time).total_seconds() |
| | average_credibility = self._calculate_average_credibility(summaries) |
| | |
| | |
| | result = SummarizationOutput( |
| | summaries=summaries, |
| | total_documents=len(input_data.documents), |
| | total_processing_time=total_processing_time, |
| | average_credibility=average_credibility, |
| | common_themes=global_analysis.get('common_themes', []), |
| | consensus_points=global_analysis.get('consensus_points', []), |
| | conflicting_views=global_analysis.get('conflicting_views', []) |
| | ) |
| | |
| | self.logger.info( |
| | f"Summarization terminée: {len(summaries)} résumés créés en {total_processing_time:.2f}s" |
| | ) |
| | |
| | return result |
| | |
| | except Exception as e: |
| | self.logger.error(f"Erreur lors de la summarization: {str(e)}") |
| | raise |
| | |
| | async def _summarize_all_documents( |
| | self, |
| | documents: List[Document], |
| | input_data: SummarizationInput |
| | ) -> List[DocumentSummary]: |
| | """Résume tous les documents en parallèle.""" |
| | semaphore = asyncio.Semaphore(self.max_concurrent_summaries) |
| | |
| | async def summarize_single(doc: Document) -> DocumentSummary: |
| | async with semaphore: |
| | try: |
| | return await self._summarize_document(doc, input_data) |
| | except Exception as e: |
| | self.logger.error(f"Erreur résumé document {doc.title}: {e}") |
| | |
| | return self._create_error_summary(doc, str(e)) |
| | |
| | |
| | tasks = [summarize_single(doc) for doc in documents] |
| | summaries = await asyncio.gather(*tasks, return_exceptions=True) |
| | |
| | |
| | valid_summaries = [] |
| | for summary in summaries: |
| | if isinstance(summary, DocumentSummary): |
| | valid_summaries.append(summary) |
| | else: |
| | self.logger.error(f"Résumé invalide: {summary}") |
| | |
| | return valid_summaries |
| | |
| | async def _summarize_document(self, document: Document, input_data: SummarizationInput) -> DocumentSummary: |
| | """Résume un document individuel.""" |
| | start_time = datetime.now() |
| | doc_id = self._generate_document_id(document) |
| | |
| | self.logger.info(f"Résumé document: {document.title} ({len(document.content)} caractères)") |
| | |
| | |
| | if (input_data.chunk_large_docs and |
| | len(document.content) > self.chunk_overlap_threshold): |
| | summary = await self._summarize_large_document(document, input_data) |
| | else: |
| | summary = await self._summarize_standard_document(document, input_data) |
| | |
| | |
| | processing_time = (datetime.now() - start_time).total_seconds() |
| | summary.document_id = doc_id |
| | summary.processing_time = processing_time |
| | summary.processed_at = datetime.now() |
| | |
| | return summary |
| | |
| | async def _summarize_standard_document( |
| | self, |
| | document: Document, |
| | input_data: SummarizationInput |
| | ) -> DocumentSummary: |
| | """Résume un document de taille standard.""" |
| | |
| | |
| | context = { |
| | 'title': document.title, |
| | 'author': document.author or "Non spécifié", |
| | 'url': str(document.url), |
| | 'content': document.content |
| | } |
| | |
| | |
| | tasks = [] |
| | |
| | |
| | exec_prompt = SUMMARIZER_PROMPTS['executive_summary'].format(**context) |
| | tasks.append(self._get_llm_response(exec_prompt, "executive_summary")) |
| | |
| | |
| | if input_data.detailed_analysis: |
| | detailed_prompt = SUMMARIZER_PROMPTS['detailed_analysis'].format(**context) |
| | tasks.append(self._get_llm_response(detailed_prompt, "detailed_analysis")) |
| | |
| | |
| | if input_data.include_sentiment: |
| | sentiment_prompt = SUMMARIZER_PROMPTS['sentiment_analysis'].format(**context) |
| | tasks.append(self._get_llm_response(sentiment_prompt, "sentiment_analysis")) |
| | |
| | |
| | results = await asyncio.gather(*tasks, return_exceptions=True) |
| | |
| | |
| | executive_summary = "" |
| | detailed_summary = "" |
| | key_points = [] |
| | sentiment = None |
| | credibility_score = None |
| | |
| | for i, result in enumerate(results): |
| | if isinstance(result, Exception): |
| | self.logger.error(f"Erreur tâche {i}: {result}") |
| | continue |
| | |
| | task_type, content = result |
| | |
| | if task_type == "executive_summary": |
| | executive_summary = content |
| | elif task_type == "detailed_analysis": |
| | |
| | parsed = self._parse_detailed_analysis(content) |
| | detailed_summary = parsed.get('summary', content) |
| | key_points = parsed.get('key_points', []) |
| | elif task_type == "sentiment_analysis": |
| | |
| | parsed = self._parse_sentiment_analysis(content) |
| | sentiment = parsed.get('sentiment') |
| | credibility_score = parsed.get('credibility_score') |
| | |
| | |
| | summary = DocumentSummary( |
| | document_id="", |
| | title=document.title, |
| | url=document.url, |
| | executive_summary=executive_summary, |
| | detailed_summary=detailed_summary, |
| | key_points=key_points[:input_data.max_key_points], |
| | sentiment=sentiment, |
| | credibility_score=credibility_score |
| | ) |
| | |
| | return summary |
| | |
| | async def _summarize_large_document( |
| | self, |
| | document: Document, |
| | input_data: SummarizationInput |
| | ) -> DocumentSummary: |
| | """Résume un document long via chunking.""" |
| | self.logger.info(f"Chunking document long: {document.title}") |
| | |
| | |
| | chunks = self.chunking_manager.chunk_document( |
| | document.content, |
| | strategy="default", |
| | preserve_structure=True |
| | ) |
| | |
| | self.logger.info(f"Document découpé en {len(chunks)} chunks") |
| | |
| | |
| | chunk_summaries = await self._summarize_chunks(chunks, document) |
| | |
| | |
| | synthesis = await self._synthesize_chunk_summaries(chunk_summaries, document) |
| | |
| | return synthesis |
| | |
| | async def _summarize_chunks(self, chunks: List[TextChunk], document: Document) -> List[str]: |
| | """Résume chaque chunk individuellement en parallèle.""" |
| | async def summarize_chunk(chunk: TextChunk) -> str: |
| | context = { |
| | 'title': document.title, |
| | 'chunk_index': chunk.chunk_id, |
| | 'total_chunks': chunk.total_chunks, |
| | 'chunk_content': chunk.content |
| | } |
| | prompt = SUMMARIZER_PROMPTS['chunked_summary'].format(**context) |
| | try: |
| | return await self.llm_manager.get_completion( |
| | prompt, |
| | system_prompt=SYSTEM_PROMPTS['summarizer'] |
| | ) |
| | except Exception as e: |
| | self.logger.error(f"Erreur résumé chunk {chunk.chunk_id}: {e}") |
| | return f"Erreur résumé chunk {chunk.chunk_id}" |
| |
|
| | |
| | tasks = [summarize_chunk(chunk) for chunk in chunks] |
| | summaries = await asyncio.gather(*tasks) |
| | return summaries |
| | |
| | async def _synthesize_chunk_summaries( |
| | self, |
| | chunk_summaries: List[str], |
| | document: Document |
| | ) -> DocumentSummary: |
| | """Synthétise les résumés de chunks en un résumé unifié.""" |
| | |
| | |
| | combined_summaries = "\n\n".join([ |
| | f"Partie {i+1}: {summary}" |
| | for i, summary in enumerate(chunk_summaries) |
| | ]) |
| | |
| | context = { |
| | 'partial_summaries': combined_summaries, |
| | 'title': document.title, |
| | 'url': str(document.url) |
| | } |
| | |
| | |
| | synthesis_prompt = SUMMARIZER_PROMPTS['synthesis'].format(**context) |
| | |
| | try: |
| | synthesis_result = await self.llm_manager.get_completion( |
| | synthesis_prompt, |
| | system_prompt=SYSTEM_PROMPTS['summarizer'] |
| | ) |
| | |
| | |
| | parsed = self._parse_synthesis_result(synthesis_result) |
| | |
| | summary = DocumentSummary( |
| | document_id="", |
| | title=document.title, |
| | url=document.url, |
| | executive_summary=parsed.get('executive_summary', ''), |
| | detailed_summary=parsed.get('detailed_summary', ''), |
| | key_points=parsed.get('key_points', []), |
| | sentiment=parsed.get('sentiment'), |
| | credibility_score=parsed.get('credibility_score') |
| | ) |
| | |
| | return summary |
| | |
| | except Exception as e: |
| | self.logger.error(f"Erreur synthèse finale: {e}") |
| | |
| | return self._create_basic_summary_from_chunks(chunk_summaries, document) |
| | |
| | async def _get_llm_response(self, prompt: str, task_type: str) -> tuple: |
| | """Obtient une réponse LLM pour une tâche spécifique.""" |
| | try: |
| | response = await self.llm_manager.get_completion( |
| | prompt, |
| | system_prompt=SYSTEM_PROMPTS['summarizer'], |
| | temperature=0.3, |
| | max_tokens=2000 |
| | ) |
| | return task_type, response |
| | except Exception as e: |
| | self.logger.error(f"Erreur LLM pour {task_type}: {e}") |
| | return task_type, f"Erreur: {str(e)}" |
| | |
| | def _parse_detailed_analysis(self, content: str) -> Dict[str, Any]: |
| | """Parse l'analyse détaillée pour extraire les composants.""" |
| | |
| | result = {'summary': content, 'key_points': []} |
| | |
| | |
| | import re |
| | key_point_pattern = r'^[-•]\s*(.+)$' |
| | lines = content.split('\n') |
| | |
| | current_key_points = [] |
| | for line in lines: |
| | match = re.match(key_point_pattern, line.strip()) |
| | if match: |
| | point_text = match.group(1).strip() |
| | if len(point_text) > 10: |
| | key_point = KeyPoint( |
| | title=point_text[:50] + "..." if len(point_text) > 50 else point_text, |
| | content=point_text, |
| | importance=0.8, |
| | category="general" |
| | ) |
| | current_key_points.append(key_point) |
| | |
| | result['key_points'] = current_key_points |
| | return result |
| | |
| | def _parse_sentiment_analysis(self, content: str) -> Dict[str, Any]: |
| | """Parse l'analyse de sentiment.""" |
| | result = {} |
| | |
| | |
| | content_lower = content.lower() |
| | |
| | if 'positif' in content_lower: |
| | result['sentiment'] = 'positif' |
| | elif 'négatif' in content_lower: |
| | result['sentiment'] = 'négatif' |
| | else: |
| | result['sentiment'] = 'neutre' |
| | |
| | |
| | import re |
| | |
| | |
| | credibility_pattern = r'crédibilité\s*:?\s*(\d+(?:\.\d+)?)|(\d+(?:\.\d+)?)\s*\/\s*[1510]|(\d+(?:\.\d+)?)\s*%' |
| | match = re.search(credibility_pattern, content_lower) |
| | if match: |
| | score = float(match.group(1) or match.group(2) or match.group(3)) |
| | if score > 1: |
| | score = score / 100 |
| | result['credibility_score'] = min(max(score, 0.0), 1.0) |
| | else: |
| | result['credibility_score'] = 0.5 |
| | |
| | return result |
| | |
| | return result |
| | |
| | def _parse_synthesis_result(self, content: str) -> Dict[str, Any]: |
| | """Parse le résultat de synthèse.""" |
| | |
| | return { |
| | 'executive_summary': content[:200] + "..." if len(content) > 200 else content, |
| | 'detailed_summary': content, |
| | 'key_points': [], |
| | 'sentiment': 'neutre', |
| | 'credibility_score': 0.7 |
| | } |
| | |
| | def _create_basic_summary_from_chunks( |
| | self, |
| | chunk_summaries: List[str], |
| | document: Document |
| | ) -> DocumentSummary: |
| | """Crée un résumé basique à partir des résumés de chunks.""" |
| | combined = " ".join(chunk_summaries) |
| | |
| | return DocumentSummary( |
| | document_id="", |
| | title=document.title, |
| | url=document.url, |
| | executive_summary=combined[:200] + "..." if len(combined) > 200 else combined, |
| | detailed_summary=combined, |
| | key_points=[], |
| | sentiment="neutre", |
| | credibility_score=0.5 |
| | ) |
| | |
| | def _create_error_summary(self, document: Document, error: str) -> DocumentSummary: |
| | """Crée un résumé d'erreur minimal.""" |
| | return DocumentSummary( |
| | document_id=self._generate_document_id(document), |
| | title=document.title, |
| | url=document.url, |
| | executive_summary=f"Erreur lors du résumé: {error}", |
| | detailed_summary=f"Le résumé de ce document n'a pas pu être généré: {error}", |
| | key_points=[], |
| | sentiment=None, |
| | credibility_score=None |
| | ) |
| | |
| | def _generate_document_id(self, document: Document) -> str: |
| | """Génère un ID unique pour un document.""" |
| | content_hash = hashlib.md5(f"{document.url}{document.title}".encode()).hexdigest() |
| | return f"doc_{content_hash[:8]}" |
| | |
| | async def _perform_global_analysis(self, summaries: List[DocumentSummary]) -> Dict[str, List[str]]: |
| | """Effectue une analyse globale de tous les résumés.""" |
| | if len(summaries) < 2: |
| | return {'common_themes': [], 'consensus_points': [], 'conflicting_views': []} |
| | |
| | |
| | all_summaries = "\n\n".join([ |
| | f"Document: {s.title}\nRésumé: {s.detailed_summary}" |
| | for s in summaries |
| | ]) |
| | |
| | |
| | global_prompt = f""" |
| | Analyse les résumés de documents suivants et identifie: |
| | |
| | 1. **Thèmes communs** : Les sujets qui reviennent dans plusieurs documents |
| | 2. **Points de consensus** : Les idées sur lesquelles les sources s'accordent |
| | 3. **Points conflictuels** : Les idées contradictoires entre les sources |
| | |
| | RÉSUMÉS: |
| | {all_summaries} |
| | |
| | Format ta réponse avec des sections claires et des listes à puces. |
| | """ |
| | |
| | try: |
| | response = await self.llm_manager.get_completion( |
| | global_prompt, |
| | system_prompt="Tu es un expert en analyse comparative de documents." |
| | ) |
| | |
| | |
| | return self._parse_global_analysis(response) |
| | |
| | except Exception as e: |
| | self.logger.error(f"Erreur analyse globale: {e}") |
| | return {'common_themes': [], 'consensus_points': [], 'conflicting_views': []} |
| | |
| | def _parse_global_analysis(self, content: str) -> Dict[str, List[str]]: |
| | """Parse l'analyse globale.""" |
| | |
| | lines = content.split('\n') |
| | |
| | result = { |
| | 'common_themes': [], |
| | 'consensus_points': [], |
| | 'conflicting_views': [] |
| | } |
| | |
| | current_section = None |
| | |
| | for line in lines: |
| | line = line.strip() |
| | if not line: |
| | continue |
| | |
| | |
| | if 'thème' in line.lower() or 'theme' in line.lower(): |
| | current_section = 'common_themes' |
| | elif 'consensus' in line.lower(): |
| | current_section = 'consensus_points' |
| | elif 'conflict' in line.lower() or 'contradictoire' in line.lower(): |
| | current_section = 'conflicting_views' |
| | elif line.startswith('-') or line.startswith('•'): |
| | |
| | if current_section: |
| | point = line[1:].strip() |
| | if len(point) > 5: |
| | result[current_section].append(point) |
| | |
| | return result |
| | |
| | def _calculate_average_credibility(self, summaries: List[DocumentSummary]) -> Optional[float]: |
| | """Calcule le score de crédibilité moyen.""" |
| | scores = [s.credibility_score for s in summaries if s.credibility_score is not None] |
| | |
| | if not scores: |
| | return None |
| | |
| | return sum(scores) / len(scores) |
| |
|
| | |
| | async def process_from_extraction_result( |
| | self, |
| | extraction_result: 'ExtractionResult' |
| | ) -> SummarizationOutput: |
| | """ |
| | Traite la summarization à partir d'un ExtractionResult. |
| | |
| | Args: |
| | extraction_result: Résultat de l'extraction de contenu |
| | Returns: |
| | SummarizationOutput avec tous les résumés |
| | """ |
| | |
| | input_data = SummarizationInput( |
| | documents=extraction_result.documents, |
| | summary_options={ |
| | 'include_sentiment': True, |
| | 'include_citations': True, |
| | 'max_key_points': 5, |
| | 'detailed_analysis': True, |
| | 'chunk_large_docs': True |
| | } |
| |
|
| | ) |
| | |
| | if not self.validate_input(input_data): |
| | self.logger.error("Input ExtractionResult invalide pour la summarization") |
| | raise ValueError("Input ExtractionResult invalide pour la summarization") |
| | |
| | |
| | return await self.process(input_data) |
| | |
| |
|
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | import asyncio |
| | import json |
| | from src.models.document_models import ExtractionResult |
| |
|
| | def save_summarization_output(output, filename=None): |
| | """Sauvegarde un SummarizationOutput au format JSON.""" |
| | from datetime import datetime |
| | if not filename: |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | filename = f"summarization_output_{len(output.summaries)}docs_{timestamp}.json" |
| | try: |
| | with open(filename, 'w', encoding='utf-8') as f: |
| | json.dump(output.model_dump(mode='json'), f, indent=2, ensure_ascii=False) |
| | return filename |
| | except Exception as e: |
| | print(f"Erreur lors de la sauvegarde: {e}") |
| | return None |
| |
|
| | async def summarize_from_extraction_file(): |
| | |
| | extraction_file = "extraction_result_2docs_20251116_141527.json" |
| | try: |
| | with open(extraction_file, 'r', encoding='utf-8') as f: |
| | extraction_data = json.load(f) |
| | extraction_result = ExtractionResult(**extraction_data) |
| | except Exception as e: |
| | print(f"Erreur chargement ExtractionResult: {e}") |
| | return |
| |
|
| | |
| |
|
| | summarizer = SummarizerAgent() |
| |
|
| | output = await summarizer.process_from_extraction_result(extraction_result) |
| |
|
| | |
| | filename = save_summarization_output(output) |
| | if filename: |
| | print(f"✅ Résumés sauvegardés dans: {filename}") |
| | else: |
| | print("❌ Erreur lors de la sauvegarde du résumé.") |
| |
|
| | |
| | for summary in output.summaries: |
| | print(f"\nRésumé pour {summary.title}:") |
| | print(f"Résumé exécutif: {summary.executive_summary[:200]}...") |
| | print(f"Points clés: {[kp.title for kp in summary.key_points]}") |
| | print(f"Sentiment: {summary.sentiment}") |
| | print(f"Score de crédibilité: {summary.credibility_score}") |
| | print(f"Temps total de traitement: {output.total_processing_time:.2f}s") |
| | print(f"Score de crédibilité moyen: {output.average_credibility}") |
| |
|
| | asyncio.run(summarize_from_extraction_file()) |