syscred_duplicate / syscred /graph_rag.py
D Ф m i И i q ц e L Ф y e r
Deploy SysCRED v2.3.1 - GraphRAG + LIAR benchmark + TREC integration
8e97fc5
# -*- coding: utf-8 -*-
"""
GraphRAG Module - SysCRED
=========================
Retrieves context from the Knowledge Graph to enhance verification.
Transforms "Passive" Graph into "Active" Context.
(c) Dominique S. Loyer - PhD Thesis Prototype
"""
from typing import List, Dict, Any, Optional
from syscred.ontology_manager import OntologyManager
class GraphRAG:
"""
Retrieval Augmented Generation using the Semantic Knowledge Graph.
"""
def __init__(self, ontology_manager: OntologyManager):
self.om = ontology_manager
def get_context(self, domain: str, keywords: List[str] = []) -> Dict[str, str]:
"""
Retrieve context for a specific verification task.
Args:
domain: The domain being analyzed (e.g., 'lemonde.fr')
keywords: List of keywords from the claim (not yet used in V1)
Returns:
Dictionary with natural language context strings.
"""
if not self.om:
return {"graph_context": "No ontology manager available."}
context_parts = []
# 1. Source History
source_history = self._get_source_history(domain)
if source_history:
context_parts.append(source_history)
# 2. Pattern Matching (Similar Claims)
similar_uris = []
if keywords:
similar_result = self._find_similar_claims(keywords)
if similar_result["text"]:
context_parts.append(similar_result["text"])
similar_uris = similar_result["uris"]
full_context = "\n\n".join(context_parts) if context_parts else "No prior knowledge found in the graph."
return {
"full_text": full_context,
"source_history": source_history,
"similar_uris": similar_uris # [NEW] Return URIs for linking
}
def _get_source_history(self, domain: str) -> str:
"""
Query the graph for all previous evaluations of this domain.
"""
if not domain:
return ""
# We reuse the specific query logic but tailored for retrieval
query = """
PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#>
SELECT ?score ?level ?timestamp
WHERE {
?info cred:informationURL ?url .
?request cred:concernsInformation ?info .
?report cred:isReportOf ?request .
?report cred:credibilityScoreValue ?score .
?report cred:assignsCredibilityLevel ?level .
?report cred:completionTimestamp ?timestamp .
FILTER(CONTAINS(STR(?url), "%s"))
}
ORDER BY DESC(?timestamp)
LIMIT 5
""" % domain
results = []
try:
combined = self.om.base_graph + self.om.data_graph
for row in combined.query(query):
results.append({
"score": float(row.score),
"level": str(row.level).split('#')[-1],
"date": str(row.timestamp).split('T')[0]
})
except Exception as e:
print(f"[GraphRAG] Query error: {e}")
return ""
if not results:
return f"The graph contains no previous evaluations for {domain}."
# Summarize
count = len(results)
avg_score = sum(r['score'] for r in results) / count
last_verdict = results[0]['level']
summary = (
f"Graph Memory for '{domain}':\n"
f"- Analyzed {count} times previously.\n"
f"- Average Credibility Score: {avg_score:.2f} / 1.0\n"
f"- Most recent verdict ({results[0]['date']}): {last_verdict}.\n"
)
return summary
def _find_similar_claims(self, keywords: List[str]) -> Dict[str, Any]:
"""
Find evaluation history for content containing specific keywords.
Returns dict with 'text' (for LLM) and 'uris' (for Graph linking).
"""
if not keywords:
return {"text": "", "uris": [], "scores": []}
# Build REGEX filter for keywords (OR logic)
# e.g., (fake|hoax|conspiracy)
clean_kws = [k for k in keywords if len(k) > 3] # Skip short words
if not clean_kws:
return {"text": "", "uris": [], "scores": []}
regex_pattern = "|".join(clean_kws)
query = """
PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#>
SELECT ?report ?content ?score ?level ?timestamp
WHERE {
?info cred:informationContent ?content .
?request cred:concernsInformation ?info .
?report cred:isReportOf ?request .
?report cred:credibilityScoreValue ?score .
?report cred:assignsCredibilityLevel ?level .
?report cred:completionTimestamp ?timestamp .
FILTER(REGEX(?content, "%s", "i"))
}
ORDER BY DESC(?timestamp)
LIMIT 3
""" % regex_pattern
results = []
try:
combined = self.om.base_graph + self.om.data_graph
for row in combined.query(query):
results.append({
"uri": str(row.report),
"content": str(row.content)[:100] + "...",
"score": float(row.score),
"verdict": str(row.level).split('#')[-1]
})
except Exception as e:
print(f"[GraphRAG] Similar claims error: {e}")
return {"text": "", "uris": [], "scores": []}
if not results:
return {"text": "", "uris": [], "scores": []}
lines = [f"Found {len(results)} similar claims in history:"]
for r in results:
lines.append(f"- \"{r['content']}\" ({r['verdict']}, Score: {r['score']:.2f})")
return {
"text": "\n".join(lines),
"uris": [r['uri'] for r in results],
"scores": [r['score'] for r in results]
}
def compute_context_score(self, domain: str, keywords: List[str] = []) -> Dict[str, float]:
"""
Compute numerical context scores for integration into credibility scoring.
This transforms the GraphRAG context into actionable numerical scores
that can be directly used in the calculate_overall_score() function.
Args:
domain: The domain being analyzed (e.g., 'lemonde.fr')
keywords: List of keywords from the claim
Returns:
Dictionary with:
- 'history_score': 0.0-1.0 based on past evaluations of this domain
- 'pattern_score': 0.0-1.0 based on similar claims in the graph
- 'combined_score': Weighted average (0.7 * history + 0.3 * pattern)
- 'confidence': How confident we are (based on amount of data)
- 'has_history': Boolean if domain has prior evaluations
"""
result = {
'history_score': 0.5, # Neutral default
'pattern_score': 0.5,
'combined_score': 0.5,
'confidence': 0.0,
'has_history': False,
'history_count': 0,
'similar_count': 0
}
if not self.om:
return result
# 1. Get source history score
history_data = self._get_source_history_data(domain)
if history_data['count'] > 0:
result['history_score'] = history_data['avg_score']
result['has_history'] = True
result['history_count'] = history_data['count']
# Confidence increases with more data points (max at 5)
history_confidence = min(1.0, history_data['count'] / 5)
else:
history_confidence = 0.0
# 2. Get pattern score from similar claims
if keywords:
similar_result = self._find_similar_claims(keywords)
scores = similar_result.get('scores', [])
if scores:
result['pattern_score'] = sum(scores) / len(scores)
result['similar_count'] = len(scores)
pattern_confidence = min(1.0, len(scores) / 3)
else:
pattern_confidence = 0.0
else:
pattern_confidence = 0.0
# 3. Calculate combined score
# Weight history more heavily than pattern matching
if result['has_history'] and result['similar_count'] > 0:
result['combined_score'] = 0.7 * result['history_score'] + 0.3 * result['pattern_score']
result['confidence'] = 0.6 * history_confidence + 0.4 * pattern_confidence
elif result['has_history']:
result['combined_score'] = result['history_score']
result['confidence'] = history_confidence * 0.8 # Reduce confidence without pattern
elif result['similar_count'] > 0:
result['combined_score'] = result['pattern_score']
result['confidence'] = pattern_confidence * 0.5 # Lower confidence with only patterns
else:
# No data available - return neutral
result['combined_score'] = 0.5
result['confidence'] = 0.0
return result
def _get_source_history_data(self, domain: str) -> Dict[str, Any]:
"""
Query the graph for evaluation statistics of this domain.
Returns:
Dictionary with 'count', 'avg_score', 'last_verdict', 'scores'
"""
if not domain:
return {'count': 0, 'avg_score': 0.5, 'scores': []}
query = """
PREFIX cred: <https://github.com/DominiqueLoyer/systemFactChecking#>
SELECT ?score ?level ?timestamp
WHERE {
?info cred:informationURL ?url .
?request cred:concernsInformation ?info .
?report cred:isReportOf ?request .
?report cred:credibilityScoreValue ?score .
?report cred:assignsCredibilityLevel ?level .
?report cred:completionTimestamp ?timestamp .
FILTER(CONTAINS(STR(?url), "%s"))
}
ORDER BY DESC(?timestamp)
LIMIT 10
""" % domain
scores = []
last_verdict = None
try:
combined = self.om.base_graph + self.om.data_graph
for i, row in enumerate(combined.query(query)):
scores.append(float(row.score))
if i == 0:
last_verdict = str(row.level).split('#')[-1]
except Exception as e:
print(f"[GraphRAG] History data query error: {e}")
return {'count': 0, 'avg_score': 0.5, 'scores': []}
if not scores:
return {'count': 0, 'avg_score': 0.5, 'scores': []}
return {
'count': len(scores),
'avg_score': sum(scores) / len(scores),
'last_verdict': last_verdict,
'scores': scores
}