Spaces:
Running
Running
D Ф m i И i q ц e L Ф y e r
Deploy SysCRED v2.3.1 - GraphRAG + LIAR benchmark + TREC integration
8e97fc5 | # -*- coding: utf-8 -*- | |
| """ | |
| GraphRAG Module - SysCRED | |
| ========================= | |
| Retrieves context from the Knowledge Graph to enhance verification. | |
| Transforms "Passive" Graph into "Active" Context. | |
| (c) Dominique S. Loyer - PhD Thesis Prototype | |
| """ | |
| from typing import List, Dict, Any, Optional | |
| from syscred.ontology_manager import OntologyManager | |
| class GraphRAG: | |
| """ | |
| Retrieval Augmented Generation using the Semantic Knowledge Graph. | |
| """ | |
| def __init__(self, ontology_manager: OntologyManager): | |
| self.om = ontology_manager | |
| def get_context(self, domain: str, keywords: List[str] = []) -> Dict[str, str]: | |
| """ | |
| Retrieve context for a specific verification task. | |
| Args: | |
| domain: The domain being analyzed (e.g., 'lemonde.fr') | |
| keywords: List of keywords from the claim (not yet used in V1) | |
| Returns: | |
| Dictionary with natural language context strings. | |
| """ | |
| if not self.om: | |
| return {"graph_context": "No ontology manager available."} | |
| context_parts = [] | |
| # 1. Source History | |
| source_history = self._get_source_history(domain) | |
| if source_history: | |
| context_parts.append(source_history) | |
| # 2. Pattern Matching (Similar Claims) | |
| similar_uris = [] | |
| if keywords: | |
| similar_result = self._find_similar_claims(keywords) | |
| if similar_result["text"]: | |
| context_parts.append(similar_result["text"]) | |
| similar_uris = similar_result["uris"] | |
| full_context = "\n\n".join(context_parts) if context_parts else "No prior knowledge found in the graph." | |
| return { | |
| "full_text": full_context, | |
| "source_history": source_history, | |
| "similar_uris": similar_uris # [NEW] Return URIs for linking | |
| } | |
| def _get_source_history(self, domain: str) -> str: | |
| """ | |
| Query the graph for all previous evaluations of this domain. | |
| """ | |
| if not domain: | |
| return "" | |
| # We reuse the specific query logic but tailored for retrieval | |
| query = """ | |
| PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#> | |
| SELECT ?score ?level ?timestamp | |
| WHERE { | |
| ?info cred:informationURL ?url . | |
| ?request cred:concernsInformation ?info . | |
| ?report cred:isReportOf ?request . | |
| ?report cred:credibilityScoreValue ?score . | |
| ?report cred:assignsCredibilityLevel ?level . | |
| ?report cred:completionTimestamp ?timestamp . | |
| FILTER(CONTAINS(STR(?url), "%s")) | |
| } | |
| ORDER BY DESC(?timestamp) | |
| LIMIT 5 | |
| """ % domain | |
| results = [] | |
| try: | |
| combined = self.om.base_graph + self.om.data_graph | |
| for row in combined.query(query): | |
| results.append({ | |
| "score": float(row.score), | |
| "level": str(row.level).split('#')[-1], | |
| "date": str(row.timestamp).split('T')[0] | |
| }) | |
| except Exception as e: | |
| print(f"[GraphRAG] Query error: {e}") | |
| return "" | |
| if not results: | |
| return f"The graph contains no previous evaluations for {domain}." | |
| # Summarize | |
| count = len(results) | |
| avg_score = sum(r['score'] for r in results) / count | |
| last_verdict = results[0]['level'] | |
| summary = ( | |
| f"Graph Memory for '{domain}':\n" | |
| f"- Analyzed {count} times previously.\n" | |
| f"- Average Credibility Score: {avg_score:.2f} / 1.0\n" | |
| f"- Most recent verdict ({results[0]['date']}): {last_verdict}.\n" | |
| ) | |
| return summary | |
| def _find_similar_claims(self, keywords: List[str]) -> Dict[str, Any]: | |
| """ | |
| Find evaluation history for content containing specific keywords. | |
| Returns dict with 'text' (for LLM) and 'uris' (for Graph linking). | |
| """ | |
| if not keywords: | |
| return {"text": "", "uris": [], "scores": []} | |
| # Build REGEX filter for keywords (OR logic) | |
| # e.g., (fake|hoax|conspiracy) | |
| clean_kws = [k for k in keywords if len(k) > 3] # Skip short words | |
| if not clean_kws: | |
| return {"text": "", "uris": [], "scores": []} | |
| regex_pattern = "|".join(clean_kws) | |
| query = """ | |
| PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#> | |
| SELECT ?report ?content ?score ?level ?timestamp | |
| WHERE { | |
| ?info cred:informationContent ?content . | |
| ?request cred:concernsInformation ?info . | |
| ?report cred:isReportOf ?request . | |
| ?report cred:credibilityScoreValue ?score . | |
| ?report cred:assignsCredibilityLevel ?level . | |
| ?report cred:completionTimestamp ?timestamp . | |
| FILTER(REGEX(?content, "%s", "i")) | |
| } | |
| ORDER BY DESC(?timestamp) | |
| LIMIT 3 | |
| """ % regex_pattern | |
| results = [] | |
| try: | |
| combined = self.om.base_graph + self.om.data_graph | |
| for row in combined.query(query): | |
| results.append({ | |
| "uri": str(row.report), | |
| "content": str(row.content)[:100] + "...", | |
| "score": float(row.score), | |
| "verdict": str(row.level).split('#')[-1] | |
| }) | |
| except Exception as e: | |
| print(f"[GraphRAG] Similar claims error: {e}") | |
| return {"text": "", "uris": [], "scores": []} | |
| if not results: | |
| return {"text": "", "uris": [], "scores": []} | |
| lines = [f"Found {len(results)} similar claims in history:"] | |
| for r in results: | |
| lines.append(f"- \"{r['content']}\" ({r['verdict']}, Score: {r['score']:.2f})") | |
| return { | |
| "text": "\n".join(lines), | |
| "uris": [r['uri'] for r in results], | |
| "scores": [r['score'] for r in results] | |
| } | |
| def compute_context_score(self, domain: str, keywords: List[str] = []) -> Dict[str, float]: | |
| """ | |
| Compute numerical context scores for integration into credibility scoring. | |
| This transforms the GraphRAG context into actionable numerical scores | |
| that can be directly used in the calculate_overall_score() function. | |
| Args: | |
| domain: The domain being analyzed (e.g., 'lemonde.fr') | |
| keywords: List of keywords from the claim | |
| Returns: | |
| Dictionary with: | |
| - 'history_score': 0.0-1.0 based on past evaluations of this domain | |
| - 'pattern_score': 0.0-1.0 based on similar claims in the graph | |
| - 'combined_score': Weighted average (0.7 * history + 0.3 * pattern) | |
| - 'confidence': How confident we are (based on amount of data) | |
| - 'has_history': Boolean if domain has prior evaluations | |
| """ | |
| result = { | |
| 'history_score': 0.5, # Neutral default | |
| 'pattern_score': 0.5, | |
| 'combined_score': 0.5, | |
| 'confidence': 0.0, | |
| 'has_history': False, | |
| 'history_count': 0, | |
| 'similar_count': 0 | |
| } | |
| if not self.om: | |
| return result | |
| # 1. Get source history score | |
| history_data = self._get_source_history_data(domain) | |
| if history_data['count'] > 0: | |
| result['history_score'] = history_data['avg_score'] | |
| result['has_history'] = True | |
| result['history_count'] = history_data['count'] | |
| # Confidence increases with more data points (max at 5) | |
| history_confidence = min(1.0, history_data['count'] / 5) | |
| else: | |
| history_confidence = 0.0 | |
| # 2. Get pattern score from similar claims | |
| if keywords: | |
| similar_result = self._find_similar_claims(keywords) | |
| scores = similar_result.get('scores', []) | |
| if scores: | |
| result['pattern_score'] = sum(scores) / len(scores) | |
| result['similar_count'] = len(scores) | |
| pattern_confidence = min(1.0, len(scores) / 3) | |
| else: | |
| pattern_confidence = 0.0 | |
| else: | |
| pattern_confidence = 0.0 | |
| # 3. Calculate combined score | |
| # Weight history more heavily than pattern matching | |
| if result['has_history'] and result['similar_count'] > 0: | |
| result['combined_score'] = 0.7 * result['history_score'] + 0.3 * result['pattern_score'] | |
| result['confidence'] = 0.6 * history_confidence + 0.4 * pattern_confidence | |
| elif result['has_history']: | |
| result['combined_score'] = result['history_score'] | |
| result['confidence'] = history_confidence * 0.8 # Reduce confidence without pattern | |
| elif result['similar_count'] > 0: | |
| result['combined_score'] = result['pattern_score'] | |
| result['confidence'] = pattern_confidence * 0.5 # Lower confidence with only patterns | |
| else: | |
| # No data available - return neutral | |
| result['combined_score'] = 0.5 | |
| result['confidence'] = 0.0 | |
| return result | |
| def _get_source_history_data(self, domain: str) -> Dict[str, Any]: | |
| """ | |
| Query the graph for evaluation statistics of this domain. | |
| Returns: | |
| Dictionary with 'count', 'avg_score', 'last_verdict', 'scores' | |
| """ | |
| if not domain: | |
| return {'count': 0, 'avg_score': 0.5, 'scores': []} | |
| query = """ | |
| PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#> | |
| SELECT ?score ?level ?timestamp | |
| WHERE { | |
| ?info cred:informationURL ?url . | |
| ?request cred:concernsInformation ?info . | |
| ?report cred:isReportOf ?request . | |
| ?report cred:credibilityScoreValue ?score . | |
| ?report cred:assignsCredibilityLevel ?level . | |
| ?report cred:completionTimestamp ?timestamp . | |
| FILTER(CONTAINS(STR(?url), "%s")) | |
| } | |
| ORDER BY DESC(?timestamp) | |
| LIMIT 10 | |
| """ % domain | |
| scores = [] | |
| last_verdict = None | |
| try: | |
| combined = self.om.base_graph + self.om.data_graph | |
| for i, row in enumerate(combined.query(query)): | |
| scores.append(float(row.score)) | |
| if i == 0: | |
| last_verdict = str(row.level).split('#')[-1] | |
| except Exception as e: | |
| print(f"[GraphRAG] History data query error: {e}") | |
| return {'count': 0, 'avg_score': 0.5, 'scores': []} | |
| if not scores: | |
| return {'count': 0, 'avg_score': 0.5, 'scores': []} | |
| return { | |
| 'count': len(scores), | |
| 'avg_score': sum(scores) / len(scores), | |
| 'last_verdict': last_verdict, | |
| 'scores': scores | |
| } | |