Spaces:
Running
Running
| import logging | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from agentgraph.shared.models.reference_based.entity import Entity | |
| from agentgraph.shared.models.reference_based.relation import Relation | |
| from agentgraph.shared.models.reference_based.content_reference import ContentReference | |
| from agentgraph.input.text_processing.trace_line_processor import TraceLineNumberProcessor | |
| logger = logging.getLogger(__name__) | |
| # Sentinel delimiter used to concatenate multiple prompt snippets when more than one | |
| # reference is resolved. We choose the Unicode "SYMBOL FOR UNIT SEPARATOR" (U+241F) | |
| # which will never legitimately appear inside user-supplied prompt text, eliminating | |
| # delimiter-collision issues seen with the previous "|||" sequence. | |
| MULTI_SNIPPET_DELIMITER = "\u241F" | |
| class ContentReferenceResolver: | |
| """ | |
| Service for resolving ContentReference objects to actual content from original traces. | |
| This enables efficient content retrieval while maintaining position-based references. | |
| """ | |
| def __init__(self): | |
| self.line_processor = TraceLineNumberProcessor() | |
| def resolve_entity_prompts(self, | |
| entities: List[Entity], | |
| original_trace: str, | |
| window_metadata: Dict[str, Any]) -> List[Entity]: | |
| """ | |
| Resolve ContentReference objects in entities to actual prompt content. | |
| Args: | |
| entities: List of Entity objects that may contain ContentReference objects | |
| original_trace: Original trace content (without line numbers) | |
| window_metadata: Metadata about the window including character positions | |
| Returns: | |
| List of Entity objects with resolved prompt content | |
| """ | |
| if not entities or not original_trace: | |
| return entities | |
| # CRITICAL FIX: Use the same character-to-line mapping approach as extraction | |
| # This ensures ContentReferences point to the correct lines | |
| numbered_content = self._create_extraction_compatible_numbering(original_trace) | |
| resolved_entities = [] | |
| resolution_stats = { | |
| "total_entities": len(entities), | |
| "entities_with_refs": 0, | |
| "successful_resolutions": 0, | |
| "failed_resolutions": 0 | |
| } | |
| for entity in entities: | |
| resolved_entity = entity.model_copy() # Create a copy to avoid modifying original | |
| # Check if entity has a content reference | |
| if entity.raw_prompt_ref: | |
| resolution_stats["entities_with_refs"] += 1 | |
| # Resolve the content reference | |
| snippets, is_valid = self.line_processor.extract_content_by_reference( | |
| numbered_content, entity.raw_prompt_ref | |
| ) | |
| # Add detailed debug logging to track resolution process | |
| logger.debug(f"Entity {entity.id} resolution debug:") | |
| logger.debug(f" - raw_prompt_ref count: {len(entity.raw_prompt_ref)}") | |
| for idx, ref in enumerate(entity.raw_prompt_ref): | |
| logger.debug(f" - ref[{idx}]: L{ref.line_start}-L{ref.line_end}") | |
| logger.debug(f" - extracted snippets count: {len(snippets) if snippets else 0}") | |
| if snippets: | |
| for idx, snippet in enumerate(snippets): | |
| preview = snippet[:50].replace('\n', '\\n') if snippet else "EMPTY" | |
| logger.debug(f" - snippet[{idx}]: {preview}...") | |
| if snippets: | |
| # Scrub any accidental occurrences of the delimiter inside the snippet | |
| safe_snippets = [ | |
| s.replace(MULTI_SNIPPET_DELIMITER, " ") for s in snippets | |
| ] | |
| # Concatenate snippets into a single string when multiple references exist | |
| joined_prompt = ( | |
| safe_snippets[0] | |
| if len(safe_snippets) == 1 | |
| else MULTI_SNIPPET_DELIMITER.join(safe_snippets) | |
| ) | |
| resolved_entity.raw_prompt = joined_prompt | |
| resolution_stats["successful_resolutions"] += 1 | |
| # Debug logging to check if line numbers are being removed | |
| logger.debug(f"Resolved prompt for entity {entity.id}: {len(joined_prompt)} characters") | |
| if '<L' in joined_prompt and '>' in joined_prompt: | |
| logger.warning(f"Line numbers still present in resolved entity {entity.id}: {joined_prompt[:100]}...") | |
| else: | |
| logger.debug(f"Entity {entity.id} prompt is clean (no line numbers detected)") | |
| if len(safe_snippets) > 1: | |
| logger.debug(f" - joined with delimiter, split count will be: {len(safe_snippets)}") | |
| else: | |
| # Keep original prompt if resolution failed | |
| resolution_stats["failed_resolutions"] += 1 | |
| logger.warning(f"Failed to resolve prompt reference for entity {entity.id}") | |
| resolved_entities.append(resolved_entity) | |
| logger.info(f"Entity prompt resolution stats: {resolution_stats}") | |
| return resolved_entities | |
| def resolve_relation_prompts(self, | |
| relations: List[Relation], | |
| original_trace: str, | |
| window_metadata: Dict[str, Any]) -> List[Relation]: | |
| """ | |
| Resolve ContentReference objects in relations to actual interaction prompt content. | |
| Args: | |
| relations: List of Relation objects that may contain ContentReference objects | |
| original_trace: Original trace content (without line numbers) | |
| window_metadata: Metadata about the window including character positions | |
| Returns: | |
| List of Relation objects with resolved interaction prompt content | |
| """ | |
| if not relations or not original_trace: | |
| return relations | |
| numbered_content = self._create_extraction_compatible_numbering(original_trace) | |
| resolved_relations = [] | |
| resolution_stats = { | |
| "total_relations": len(relations), | |
| "relations_with_refs": 0, | |
| "successful_resolutions": 0, | |
| "failed_resolutions": 0 | |
| } | |
| for relation in relations: | |
| resolved_relation = relation.model_copy() # Create a copy to avoid modifying original | |
| # Check if relation has a content reference | |
| if relation.interaction_prompt_ref: | |
| resolution_stats["relations_with_refs"] += 1 | |
| # Resolve the content reference | |
| snippets, is_valid = self.line_processor.extract_content_by_reference( | |
| numbered_content, relation.interaction_prompt_ref | |
| ) | |
| if snippets: | |
| # Scrub any accidental occurrences of the delimiter inside the snippet | |
| safe_snippets = [ | |
| s.replace(MULTI_SNIPPET_DELIMITER, " ") for s in snippets | |
| ] | |
| # Concatenate snippets into a single string when multiple references exist | |
| joined_prompt = ( | |
| safe_snippets[0] | |
| if len(safe_snippets) == 1 | |
| else MULTI_SNIPPET_DELIMITER.join(safe_snippets) | |
| ) | |
| resolved_relation.interaction_prompt = joined_prompt | |
| resolution_stats["successful_resolutions"] += 1 | |
| # Debug logging to check if line numbers are being removed | |
| logger.debug(f"Resolved interaction prompt for relation {relation.id}: {len(joined_prompt)} characters") | |
| if '<L' in joined_prompt and '>' in joined_prompt: | |
| logger.warning(f"Line numbers still present in resolved relation {relation.id}: {joined_prompt[:100]}...") | |
| else: | |
| logger.debug(f"Relation {relation.id} prompt is clean (no line numbers detected)") | |
| else: | |
| # Keep original prompt if resolution failed | |
| resolution_stats["failed_resolutions"] += 1 | |
| logger.warning(f"Failed to resolve interaction prompt reference for relation {relation.id}") | |
| resolved_relations.append(resolved_relation) | |
| logger.info(f"Relation prompt resolution stats: {resolution_stats}") | |
| return resolved_relations | |
| def resolve_knowledge_graph_content(self, | |
| knowledge_graph: Dict[str, Any], | |
| original_trace: str, | |
| window_metadata: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Resolve all ContentReference objects in a knowledge graph to actual content. | |
| Args: | |
| knowledge_graph: Knowledge graph dictionary containing entities and relations | |
| original_trace: Original trace content (without line numbers) | |
| window_metadata: Metadata about the window including character positions | |
| Returns: | |
| Knowledge graph with resolved content references | |
| """ | |
| if not knowledge_graph or not original_trace: | |
| return knowledge_graph | |
| resolved_kg = knowledge_graph.copy() | |
| # Resolve entity prompts | |
| if "entities" in resolved_kg: | |
| # Convert dict entities to Entity objects if needed | |
| entities = [] | |
| for entity_data in resolved_kg["entities"]: | |
| if isinstance(entity_data, dict): | |
| entity = Entity(**entity_data) | |
| else: | |
| entity = entity_data | |
| entities.append(entity) | |
| resolved_entities = self.resolve_entity_prompts(entities, original_trace, window_metadata) | |
| # Convert back to dict format | |
| resolved_kg["entities"] = [entity.model_dump() for entity in resolved_entities] | |
| # Resolve relation prompts | |
| if "relations" in resolved_kg: | |
| # Convert dict relations to Relation objects if needed | |
| relations = [] | |
| for relation_data in resolved_kg["relations"]: | |
| if isinstance(relation_data, dict): | |
| relation = Relation(**relation_data) | |
| else: | |
| relation = relation_data | |
| relations.append(relation) | |
| resolved_relations = self.resolve_relation_prompts(relations, original_trace, window_metadata) | |
| # Convert back to dict format | |
| resolved_kg["relations"] = [relation.model_dump() for relation in resolved_relations] | |
| # Add resolution metadata | |
| if "metadata" not in resolved_kg: | |
| resolved_kg["metadata"] = {} | |
| resolved_kg["metadata"]["content_resolution"] = { | |
| "resolved_at": self._get_current_timestamp(), | |
| "original_trace_length": len(original_trace), | |
| "resolution_method": "content_reference_resolver" | |
| } | |
| logger.info(f"Resolved content references for knowledge graph with {len(resolved_kg.get('entities', []))} entities and {len(resolved_kg.get('relations', []))} relations") | |
| return resolved_kg | |
| def validate_content_references(self, | |
| content_refs: List[ContentReference], | |
| original_trace: str) -> Dict[str, Any]: | |
| """ | |
| Validate a list of ContentReference objects against the original trace. | |
| Args: | |
| content_refs: List of ContentReference objects to validate | |
| original_trace: Original trace content | |
| Returns: | |
| Validation report dictionary | |
| """ | |
| if not content_refs or not original_trace: | |
| return {"valid_references": 0, "invalid_references": 0, "details": []} | |
| # Use extraction-compatible numbering for validation | |
| numbered_content = self._create_extraction_compatible_numbering(original_trace) | |
| total_lines = len(original_trace.split('\n')) | |
| validation_report = { | |
| "total_references": len(content_refs), | |
| "valid_references": 0, | |
| "invalid_references": 0, | |
| "details": [] | |
| } | |
| for i, content_ref in enumerate(content_refs): | |
| detail = { | |
| "index": i, | |
| "content_type": content_ref.content_type, | |
| "line_range": f"{content_ref.line_start}-{content_ref.line_end}", | |
| "is_valid": True, | |
| "issues": [] | |
| } | |
| # Check line range validity | |
| if content_ref.line_start < 1 or content_ref.line_end < 1: | |
| detail["is_valid"] = False | |
| detail["issues"].append("Line numbers must be >= 1") | |
| if content_ref.line_start > total_lines or content_ref.line_end > total_lines: | |
| detail["is_valid"] = False | |
| detail["issues"].append(f"Line numbers exceed total lines ({total_lines})") | |
| if not content_ref.validate_line_range(): | |
| detail["is_valid"] = False | |
| detail["issues"].append("line_end must be >= line_start") | |
| # Try to extract content and validate | |
| try: | |
| extracted_content, content_valid = self.line_processor.extract_content_by_reference( | |
| numbered_content, content_ref | |
| ) | |
| if not content_valid: | |
| detail["is_valid"] = False | |
| detail["issues"].append("Content does not match summary") | |
| except Exception as e: | |
| detail["is_valid"] = False | |
| detail["issues"].append(f"Extraction error: {str(e)}") | |
| if detail["is_valid"]: | |
| validation_report["valid_references"] += 1 | |
| else: | |
| validation_report["invalid_references"] += 1 | |
| validation_report["details"].append(detail) | |
| return validation_report | |
| def _create_extraction_compatible_numbering(self, original_trace: str) -> str: | |
| """ | |
| Create numbered content using the same line numbering scheme as extraction. | |
| This method replicates the character-to-line mapping logic from ChunkingService | |
| to ensure ContentReferences resolve to the correct content. | |
| Args: | |
| original_trace: Original trace content (without line numbers) | |
| Returns: | |
| Content with line numbers that match extraction numbering | |
| """ | |
| # Step 1: Create character-to-line mapping (same as ChunkingService) | |
| original_lines = original_trace.split('\n') | |
| char_to_line_map = {} | |
| char_pos = 0 | |
| for line_num, line in enumerate(original_lines, 1): | |
| # Map every character in this line to this line number | |
| for i in range(len(line) + 1): # +1 for newline | |
| if char_pos + i < len(original_trace): | |
| char_to_line_map[char_pos + i] = line_num | |
| char_pos += len(line) + 1 # +1 for newline | |
| # Step 2: Add line numbers to each line using its actual line number | |
| numbered_lines = [] | |
| for line_num, line in enumerate(original_lines, 1): | |
| numbered_line = f"<L{line_num}> {line}" | |
| numbered_lines.append(numbered_line) | |
| numbered_content = '\n'.join(numbered_lines) | |
| logger.debug(f"Created extraction-compatible numbering for {len(original_lines)} lines") | |
| return numbered_content | |
| def _get_current_timestamp(self) -> str: | |
| """Get current timestamp in ISO format.""" | |
| from datetime import datetime | |
| return datetime.now().isoformat() | |
| def get_resolution_statistics(self, | |
| knowledge_graph: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Get statistics about content references in a knowledge graph. | |
| Args: | |
| knowledge_graph: Knowledge graph to analyze | |
| Returns: | |
| Statistics dictionary | |
| """ | |
| stats = { | |
| "entities": { | |
| "total": 0, | |
| "with_references": 0, | |
| "with_resolved_content": 0 | |
| }, | |
| "relations": { | |
| "total": 0, | |
| "with_references": 0, | |
| "with_resolved_content": 0 | |
| } | |
| } | |
| # Analyze entities | |
| if "entities" in knowledge_graph: | |
| stats["entities"]["total"] = len(knowledge_graph["entities"]) | |
| for entity_data in knowledge_graph["entities"]: | |
| if "raw_prompt_ref" in entity_data and entity_data["raw_prompt_ref"]: | |
| stats["entities"]["with_references"] += 1 | |
| if "raw_prompt" in entity_data and entity_data["raw_prompt"]: | |
| stats["entities"]["with_resolved_content"] += 1 | |
| # Analyze relations | |
| if "relations" in knowledge_graph: | |
| stats["relations"]["total"] = len(knowledge_graph["relations"]) | |
| for relation_data in knowledge_graph["relations"]: | |
| if "interaction_prompt_ref" in relation_data and relation_data["interaction_prompt_ref"]: | |
| stats["relations"]["with_references"] += 1 | |
| if "interaction_prompt" in relation_data and relation_data["interaction_prompt"]: | |
| stats["relations"]["with_resolved_content"] += 1 | |
| return stats |