AgentGraph / agentgraph /reconstruction /content_reference_resolver.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
import logging
from typing import List, Dict, Any, Optional, Tuple
from agentgraph.shared.models.reference_based.entity import Entity
from agentgraph.shared.models.reference_based.relation import Relation
from agentgraph.shared.models.reference_based.content_reference import ContentReference
from agentgraph.input.text_processing.trace_line_processor import TraceLineNumberProcessor
logger = logging.getLogger(__name__)
# Sentinel delimiter used to concatenate multiple prompt snippets when more than one
# reference is resolved. We choose the Unicode "SYMBOL FOR UNIT SEPARATOR" (U+241F)
# which will never legitimately appear inside user-supplied prompt text, eliminating
# delimiter-collision issues seen with the previous "|||" sequence.
MULTI_SNIPPET_DELIMITER = "\u241F"
class ContentReferenceResolver:
"""
Service for resolving ContentReference objects to actual content from original traces.
This enables efficient content retrieval while maintaining position-based references.
"""
def __init__(self):
self.line_processor = TraceLineNumberProcessor()
def resolve_entity_prompts(self,
entities: List[Entity],
original_trace: str,
window_metadata: Dict[str, Any]) -> List[Entity]:
"""
Resolve ContentReference objects in entities to actual prompt content.
Args:
entities: List of Entity objects that may contain ContentReference objects
original_trace: Original trace content (without line numbers)
window_metadata: Metadata about the window including character positions
Returns:
List of Entity objects with resolved prompt content
"""
if not entities or not original_trace:
return entities
# CRITICAL FIX: Use the same character-to-line mapping approach as extraction
# This ensures ContentReferences point to the correct lines
numbered_content = self._create_extraction_compatible_numbering(original_trace)
resolved_entities = []
resolution_stats = {
"total_entities": len(entities),
"entities_with_refs": 0,
"successful_resolutions": 0,
"failed_resolutions": 0
}
for entity in entities:
resolved_entity = entity.model_copy() # Create a copy to avoid modifying original
# Check if entity has a content reference
if entity.raw_prompt_ref:
resolution_stats["entities_with_refs"] += 1
# Resolve the content reference
snippets, is_valid = self.line_processor.extract_content_by_reference(
numbered_content, entity.raw_prompt_ref
)
# Add detailed debug logging to track resolution process
logger.debug(f"Entity {entity.id} resolution debug:")
logger.debug(f" - raw_prompt_ref count: {len(entity.raw_prompt_ref)}")
for idx, ref in enumerate(entity.raw_prompt_ref):
logger.debug(f" - ref[{idx}]: L{ref.line_start}-L{ref.line_end}")
logger.debug(f" - extracted snippets count: {len(snippets) if snippets else 0}")
if snippets:
for idx, snippet in enumerate(snippets):
preview = snippet[:50].replace('\n', '\\n') if snippet else "EMPTY"
logger.debug(f" - snippet[{idx}]: {preview}...")
if snippets:
# Scrub any accidental occurrences of the delimiter inside the snippet
safe_snippets = [
s.replace(MULTI_SNIPPET_DELIMITER, " ") for s in snippets
]
# Concatenate snippets into a single string when multiple references exist
joined_prompt = (
safe_snippets[0]
if len(safe_snippets) == 1
else MULTI_SNIPPET_DELIMITER.join(safe_snippets)
)
resolved_entity.raw_prompt = joined_prompt
resolution_stats["successful_resolutions"] += 1
# Debug logging to check if line numbers are being removed
logger.debug(f"Resolved prompt for entity {entity.id}: {len(joined_prompt)} characters")
if '<L' in joined_prompt and '>' in joined_prompt:
logger.warning(f"Line numbers still present in resolved entity {entity.id}: {joined_prompt[:100]}...")
else:
logger.debug(f"Entity {entity.id} prompt is clean (no line numbers detected)")
if len(safe_snippets) > 1:
logger.debug(f" - joined with delimiter, split count will be: {len(safe_snippets)}")
else:
# Keep original prompt if resolution failed
resolution_stats["failed_resolutions"] += 1
logger.warning(f"Failed to resolve prompt reference for entity {entity.id}")
resolved_entities.append(resolved_entity)
logger.info(f"Entity prompt resolution stats: {resolution_stats}")
return resolved_entities
def resolve_relation_prompts(self,
relations: List[Relation],
original_trace: str,
window_metadata: Dict[str, Any]) -> List[Relation]:
"""
Resolve ContentReference objects in relations to actual interaction prompt content.
Args:
relations: List of Relation objects that may contain ContentReference objects
original_trace: Original trace content (without line numbers)
window_metadata: Metadata about the window including character positions
Returns:
List of Relation objects with resolved interaction prompt content
"""
if not relations or not original_trace:
return relations
numbered_content = self._create_extraction_compatible_numbering(original_trace)
resolved_relations = []
resolution_stats = {
"total_relations": len(relations),
"relations_with_refs": 0,
"successful_resolutions": 0,
"failed_resolutions": 0
}
for relation in relations:
resolved_relation = relation.model_copy() # Create a copy to avoid modifying original
# Check if relation has a content reference
if relation.interaction_prompt_ref:
resolution_stats["relations_with_refs"] += 1
# Resolve the content reference
snippets, is_valid = self.line_processor.extract_content_by_reference(
numbered_content, relation.interaction_prompt_ref
)
if snippets:
# Scrub any accidental occurrences of the delimiter inside the snippet
safe_snippets = [
s.replace(MULTI_SNIPPET_DELIMITER, " ") for s in snippets
]
# Concatenate snippets into a single string when multiple references exist
joined_prompt = (
safe_snippets[0]
if len(safe_snippets) == 1
else MULTI_SNIPPET_DELIMITER.join(safe_snippets)
)
resolved_relation.interaction_prompt = joined_prompt
resolution_stats["successful_resolutions"] += 1
# Debug logging to check if line numbers are being removed
logger.debug(f"Resolved interaction prompt for relation {relation.id}: {len(joined_prompt)} characters")
if '<L' in joined_prompt and '>' in joined_prompt:
logger.warning(f"Line numbers still present in resolved relation {relation.id}: {joined_prompt[:100]}...")
else:
logger.debug(f"Relation {relation.id} prompt is clean (no line numbers detected)")
else:
# Keep original prompt if resolution failed
resolution_stats["failed_resolutions"] += 1
logger.warning(f"Failed to resolve interaction prompt reference for relation {relation.id}")
resolved_relations.append(resolved_relation)
logger.info(f"Relation prompt resolution stats: {resolution_stats}")
return resolved_relations
def resolve_knowledge_graph_content(self,
knowledge_graph: Dict[str, Any],
original_trace: str,
window_metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Resolve all ContentReference objects in a knowledge graph to actual content.
Args:
knowledge_graph: Knowledge graph dictionary containing entities and relations
original_trace: Original trace content (without line numbers)
window_metadata: Metadata about the window including character positions
Returns:
Knowledge graph with resolved content references
"""
if not knowledge_graph or not original_trace:
return knowledge_graph
resolved_kg = knowledge_graph.copy()
# Resolve entity prompts
if "entities" in resolved_kg:
# Convert dict entities to Entity objects if needed
entities = []
for entity_data in resolved_kg["entities"]:
if isinstance(entity_data, dict):
entity = Entity(**entity_data)
else:
entity = entity_data
entities.append(entity)
resolved_entities = self.resolve_entity_prompts(entities, original_trace, window_metadata)
# Convert back to dict format
resolved_kg["entities"] = [entity.model_dump() for entity in resolved_entities]
# Resolve relation prompts
if "relations" in resolved_kg:
# Convert dict relations to Relation objects if needed
relations = []
for relation_data in resolved_kg["relations"]:
if isinstance(relation_data, dict):
relation = Relation(**relation_data)
else:
relation = relation_data
relations.append(relation)
resolved_relations = self.resolve_relation_prompts(relations, original_trace, window_metadata)
# Convert back to dict format
resolved_kg["relations"] = [relation.model_dump() for relation in resolved_relations]
# Add resolution metadata
if "metadata" not in resolved_kg:
resolved_kg["metadata"] = {}
resolved_kg["metadata"]["content_resolution"] = {
"resolved_at": self._get_current_timestamp(),
"original_trace_length": len(original_trace),
"resolution_method": "content_reference_resolver"
}
logger.info(f"Resolved content references for knowledge graph with {len(resolved_kg.get('entities', []))} entities and {len(resolved_kg.get('relations', []))} relations")
return resolved_kg
def validate_content_references(self,
content_refs: List[ContentReference],
original_trace: str) -> Dict[str, Any]:
"""
Validate a list of ContentReference objects against the original trace.
Args:
content_refs: List of ContentReference objects to validate
original_trace: Original trace content
Returns:
Validation report dictionary
"""
if not content_refs or not original_trace:
return {"valid_references": 0, "invalid_references": 0, "details": []}
# Use extraction-compatible numbering for validation
numbered_content = self._create_extraction_compatible_numbering(original_trace)
total_lines = len(original_trace.split('\n'))
validation_report = {
"total_references": len(content_refs),
"valid_references": 0,
"invalid_references": 0,
"details": []
}
for i, content_ref in enumerate(content_refs):
detail = {
"index": i,
"content_type": content_ref.content_type,
"line_range": f"{content_ref.line_start}-{content_ref.line_end}",
"is_valid": True,
"issues": []
}
# Check line range validity
if content_ref.line_start < 1 or content_ref.line_end < 1:
detail["is_valid"] = False
detail["issues"].append("Line numbers must be >= 1")
if content_ref.line_start > total_lines or content_ref.line_end > total_lines:
detail["is_valid"] = False
detail["issues"].append(f"Line numbers exceed total lines ({total_lines})")
if not content_ref.validate_line_range():
detail["is_valid"] = False
detail["issues"].append("line_end must be >= line_start")
# Try to extract content and validate
try:
extracted_content, content_valid = self.line_processor.extract_content_by_reference(
numbered_content, content_ref
)
if not content_valid:
detail["is_valid"] = False
detail["issues"].append("Content does not match summary")
except Exception as e:
detail["is_valid"] = False
detail["issues"].append(f"Extraction error: {str(e)}")
if detail["is_valid"]:
validation_report["valid_references"] += 1
else:
validation_report["invalid_references"] += 1
validation_report["details"].append(detail)
return validation_report
def _create_extraction_compatible_numbering(self, original_trace: str) -> str:
"""
Create numbered content using the same line numbering scheme as extraction.
This method replicates the character-to-line mapping logic from ChunkingService
to ensure ContentReferences resolve to the correct content.
Args:
original_trace: Original trace content (without line numbers)
Returns:
Content with line numbers that match extraction numbering
"""
# Step 1: Create character-to-line mapping (same as ChunkingService)
original_lines = original_trace.split('\n')
char_to_line_map = {}
char_pos = 0
for line_num, line in enumerate(original_lines, 1):
# Map every character in this line to this line number
for i in range(len(line) + 1): # +1 for newline
if char_pos + i < len(original_trace):
char_to_line_map[char_pos + i] = line_num
char_pos += len(line) + 1 # +1 for newline
# Step 2: Add line numbers to each line using its actual line number
numbered_lines = []
for line_num, line in enumerate(original_lines, 1):
numbered_line = f"<L{line_num}> {line}"
numbered_lines.append(numbered_line)
numbered_content = '\n'.join(numbered_lines)
logger.debug(f"Created extraction-compatible numbering for {len(original_lines)} lines")
return numbered_content
def _get_current_timestamp(self) -> str:
"""Get current timestamp in ISO format."""
from datetime import datetime
return datetime.now().isoformat()
def get_resolution_statistics(self,
knowledge_graph: Dict[str, Any]) -> Dict[str, Any]:
"""
Get statistics about content references in a knowledge graph.
Args:
knowledge_graph: Knowledge graph to analyze
Returns:
Statistics dictionary
"""
stats = {
"entities": {
"total": 0,
"with_references": 0,
"with_resolved_content": 0
},
"relations": {
"total": 0,
"with_references": 0,
"with_resolved_content": 0
}
}
# Analyze entities
if "entities" in knowledge_graph:
stats["entities"]["total"] = len(knowledge_graph["entities"])
for entity_data in knowledge_graph["entities"]:
if "raw_prompt_ref" in entity_data and entity_data["raw_prompt_ref"]:
stats["entities"]["with_references"] += 1
if "raw_prompt" in entity_data and entity_data["raw_prompt"]:
stats["entities"]["with_resolved_content"] += 1
# Analyze relations
if "relations" in knowledge_graph:
stats["relations"]["total"] = len(knowledge_graph["relations"])
for relation_data in knowledge_graph["relations"]:
if "interaction_prompt_ref" in relation_data and relation_data["interaction_prompt_ref"]:
stats["relations"]["with_references"] += 1
if "interaction_prompt" in relation_data and relation_data["interaction_prompt"]:
stats["relations"]["with_resolved_content"] += 1
return stats