Secure-AI-Agents-Suite / ai_agent_framework /dimensions /context_compression_synthesis.py
rajkumarrawal's picture
Initial commit
2ec0d39
"""
Context Compression and Synthesis System
=======================================
Advanced context compression and efficient encoding system with synthesis capabilities
for integrating data from diverse sources and modalities within unified contextual models.
"""
import asyncio
import json
import logging
import pickle
import zlib
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple, Union, Set
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
from collections import defaultdict, deque
import hashlib
import base64
from ai_agent_framework.core.context_engineering_agent import (
ContextElement, ContextModality, ContextDimension, ContextVector
)
logger = logging.getLogger(__name__)
class CompressionStrategy(Enum):
"""Context compression strategies."""
HIERARCHICAL = "hierarchical"
SEMANTIC = "semantic"
TEMPORAL = "temporal"
RELEVANCE = "relevance"
MULTIMODAL = "multimodal"
ADAPTIVE = "adaptive"
class SynthesisMethod(Enum):
"""Context synthesis methods."""
FUSION = "fusion"
MESH = "mesh"
HIERARCHICAL = "hierarchical"
GRAPH_BASED = "graph_based"
ATTENTION_BASED = "attention_based"
ENSEMBLE = "ensemble"
@dataclass
class CompressionConfig:
"""Configuration for context compression."""
strategy: CompressionStrategy
compression_ratio: float
quality_threshold: float
adaptive_threshold: float
max_context_size: int
expiry_policy: Dict[str, Any]
@dataclass
class SynthesizedContext:
"""Represents synthesized context from multiple sources."""
id: str
source_elements: List[str]
synthesis_method: SynthesisMethod
compressed_representation: Dict[str, Any]
quality_score: float
confidence: float
temporal_scope: Tuple[datetime, datetime]
semantic_clusters: List[Dict[str, Any]]
def __post_init__(self):
if not self.id:
self.id = str(hashlib.md5(str(self.source_elements).encode()).hexdigest())
@dataclass
class ContextVector:
"""Vector representation of context for compression."""
element_id: str
vector: np.ndarray
vector_type: str # semantic, temporal, spatial, etc.
compression_level: int
quality_score: float
timestamp: datetime
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.utcnow()
class ContextCompressionEngine:
"""Advanced context compression engine with multiple strategies."""
def __init__(self):
self.compression_configs = self._initialize_compression_configs()
self.vector_index = {} # Vector-to-element mapping
self.compression_cache = {} # Cached compressed representations
self.quality_metrics = {}
def _initialize_compression_configs(self) -> Dict[CompressionStrategy, CompressionConfig]:
"""Initialize compression configurations for different strategies."""
return {
CompressionStrategy.HIERARCHICAL: CompressionConfig(
strategy=CompressionStrategy.HIERARCHICAL,
compression_ratio=0.7,
quality_threshold=0.6,
adaptive_threshold=0.5,
max_context_size=100,
expiry_policy={"level_1": timedelta(minutes=30), "level_2": timedelta(hours=2)}
),
CompressionStrategy.SEMANTIC: CompressionConfig(
strategy=CompressionStrategy.SEMANTIC,
compression_ratio=0.8,
quality_threshold=0.7,
adaptive_threshold=0.6,
max_context_size=50,
expiry_policy={"core_concepts": timedelta(hours=6), "supporting": timedelta(hours=1)}
),
CompressionStrategy.TEMPORAL: CompressionConfig(
strategy=CompressionStrategy.TEMPORAL,
compression_ratio=0.6,
quality_threshold=0.5,
adaptive_threshold=0.4,
max_context_size=200,
expiry_policy={"immediate": timedelta(minutes=15), "recent": timedelta(hours=4)}
),
CompressionStrategy.RELEVANCE: CompressionConfig(
strategy=CompressionStrategy.RELEVANCE,
compression_ratio=0.9,
quality_threshold=0.8,
adaptive_threshold=0.7,
max_context_size=30,
expiry_policy={"high_relevance": timedelta(days=1), "medium": timedelta(hours=6)}
),
CompressionStrategy.MULTIMODAL: CompressionConfig(
strategy=CompressionStrategy.MULTIMODAL,
compression_ratio=0.5,
quality_threshold=0.6,
adaptive_threshold=0.5,
max_context_size=150,
expiry_policy={"text": timedelta(hours=2), "visual": timedelta(minutes=30)}
)
}
async def compress_context(
self,
context_elements: List[ContextElement],
strategy: CompressionStrategy = CompressionStrategy.ADAPTIVE,
quality_threshold: float = 0.6
) -> Dict[str, Any]:
"""Compress context using specified strategy."""
try:
if strategy == CompressionStrategy.ADAPTIVE:
strategy = await self._select_optimal_strategy(context_elements)
config = self.compression_configs.get(strategy)
if not config:
strategy = CompressionStrategy.HIERARCHICAL
config = self.compression_configs[strategy]
if strategy == CompressionStrategy.HIERARCHICAL:
return await self._hierarchical_compression(context_elements, config, quality_threshold)
elif strategy == CompressionStrategy.SEMANTIC:
return await self._semantic_compression(context_elements, config, quality_threshold)
elif strategy == CompressionStrategy.TEMPORAL:
return await self._temporal_compression(context_elements, config, quality_threshold)
elif strategy == CompressionStrategy.RELEVANCE:
return await self._relevance_compression(context_elements, config, quality_threshold)
elif strategy == CompressionStrategy.MULTIMODAL:
return await self._multimodal_compression(context_elements, config, quality_threshold)
else:
return await self._hierarchical_compression(context_elements, config, quality_threshold)
except Exception as e:
logger.error(f"Context compression failed: {e}")
return {"status": "error", "error": str(e)}
async def _select_optimal_strategy(self, context_elements: List[ContextElement]) -> CompressionStrategy:
"""Select optimal compression strategy based on context characteristics."""
if not context_elements:
return CompressionStrategy.HIERARCHICAL
# Analyze context characteristics
modalities = set(element.modality for element in context_elements)
temporal_spread = self._calculate_temporal_spread(context_elements)
relevance_variance = np.var([element.relevance_score for element in context_elements])
# Strategy selection logic
if len(modalities) > 3: # High modality diversity
return CompressionStrategy.MULTIMODAL
elif temporal_spread > timedelta(hours=2): # Wide temporal scope
return CompressionStrategy.TEMPORAL
elif relevance_variance > 0.1: # High relevance variance
return CompressionStrategy.RELEVANCE
else:
return CompressionStrategy.SEMANTIC
def _calculate_temporal_spread(self, context_elements: List[ContextElement]) -> timedelta:
"""Calculate temporal spread of context elements."""
if len(context_elements) < 2:
return timedelta(0)
timestamps = [element.timestamp for element in context_elements]
return max(timestamps) - min(timestamps)
async def _hierarchical_compression(
self,
context_elements: List[ContextElement],
config: CompressionConfig,
quality_threshold: float
) -> Dict[str, Any]:
"""Perform hierarchical context compression."""
# Level 1: High-importance core context
core_context = [
element for element in context_elements
if element.relevance_score > 0.8 and element.confidence > 0.7
]
# Level 2: Supporting context
supporting_context = [
element for element in context_elements
if 0.5 < element.relevance_score <= 0.8
]
# Level 3: Peripheral context
peripheral_context = [
element for element in context_elements
if element.relevance_score <= 0.5
]
# Apply compression based on size limits
compressed_core = await self._compress_element_group(core_context, 0.9, "core")
compressed_supporting = await self._compress_element_group(supporting_context, 0.7, "supporting")
compressed_peripheral = await self._compress_element_group(peripheral_context, 0.5, "peripheral")
return {
"strategy": "hierarchical",
"levels": {
"core": compressed_core,
"supporting": compressed_supporting,
"peripheral": compressed_peripheral
},
"compression_ratio": len(compressed_core) + len(compressed_supporting) + len(compressed_peripheral),
"quality_threshold": quality_threshold,
"total_elements": len(context_elements),
"compressed_elements": len(compressed_core) + len(compressed_supporting) + len(compressed_peripheral)
}
async def _semantic_compression(
self,
context_elements: List[ContextElement],
config: CompressionConfig,
quality_threshold: float
) -> Dict[str, Any]:
"""Perform semantic context compression."""
# Group elements by semantic similarity
semantic_groups = await self._group_by_semantic_similarity(context_elements)
compressed_groups = []
total_original_size = 0
total_compressed_size = 0
for group in semantic_groups:
# Compress each semantic group
group_elements = group["elements"]
group_theme = group["theme"]
compressed_group = await self._compress_semantic_group(group_elements, group_theme, quality_threshold)
total_original_size += len(group_elements)
total_compressed_size += len(compressed_group["compressed_elements"])
compressed_groups.append({
"theme": group_theme,
"elements": compressed_group["compressed_elements"],
"key_concepts": compressed_group["key_concepts"],
"representative_embedding": compressed_group["representative_embedding"],
"confidence": compressed_group["confidence"]
})
return {
"strategy": "semantic",
"groups": compressed_groups,
"compression_ratio": total_compressed_size / max(total_original_size, 1),
"quality_threshold": quality_threshold,
"semantic_coherence": self._calculate_semantic_coherence(compressed_groups)
}
async def _temporal_compression(
self,
context_elements: List[ContextElement],
config: CompressionConfig,
quality_threshold: float
) -> Dict[str, Any]:
"""Perform temporal context compression."""
# Sort elements by timestamp
sorted_elements = sorted(context_elements, key=lambda e: e.timestamp)
# Define temporal windows
current_time = datetime.utcnow()
immediate_window = current_time - timedelta(minutes=30)
recent_window = current_time - timedelta(hours=4)
temporal_windows = {
"immediate": [], # Last 30 minutes
"recent": [], # Last 4 hours
"historical": [] # Everything else
}
for element in sorted_elements:
if element.timestamp >= immediate_window:
temporal_windows["immediate"].append(element)
elif element.timestamp >= recent_window:
temporal_windows["recent"].append(element)
else:
temporal_windows["historical"].append(element)
# Compress each temporal window
compressed_windows = {}
for window_name, elements in temporal_windows.items():
if elements:
compressed_windows[window_name] = await self._compress_temporal_window(
elements, window_name, quality_threshold
)
return {
"strategy": "temporal",
"windows": compressed_windows,
"temporal_distribution": {name: len(elements) for name, elements in temporal_windows.items()},
"compression_ratio": sum(len(window.get("compressed_elements", []))
for window in compressed_windows.values()) / max(len(context_elements), 1)
}
async def _relevance_compression(
self,
context_elements: List[ContextElement],
config: CompressionConfig,
quality_threshold: float
) -> Dict[str, Any]:
"""Perform relevance-based context compression."""
# Sort by relevance score
sorted_elements = sorted(context_elements, key=lambda e: e.relevance_score, reverse=True)
# Create relevance tiers
high_relevance = [e for e in sorted_elements if e.relevance_score > 0.8]
medium_relevance = [e for e in sorted_elements if 0.5 < e.relevance_score <= 0.8]
low_relevance = [e for e in sorted_elements if e.relevance_score <= 0.5]
# Apply different compression levels based on relevance
compressed_high = await self._compress_with_minimal_loss(high_relevance, "high_relevance")
compressed_medium = await self._compress_with_standard_loss(medium_relevance, "medium_relevance")
compressed_low = await self._compress_aggressively(low_relevance, "low_relevance")
return {
"strategy": "relevance",
"relevance_tiers": {
"high": compressed_high,
"medium": compressed_medium,
"low": compressed_low
},
"preservation_rate": {
"high": len(compressed_high) / max(len(high_relevance), 1),
"medium": len(compressed_medium) / max(len(medium_relevance), 1),
"low": len(compressed_low) / max(len(low_relevance), 1)
},
"compression_ratio": (len(compressed_high) + len(compressed_medium) + len(compressed_low)) / max(len(context_elements), 1)
}
async def _multimodal_compression(
self,
context_elements: List[ContextElement],
config: CompressionConfig,
quality_threshold: float
) -> Dict[str, Any]:
"""Perform multimodal context compression."""
# Group by modality
modality_groups = defaultdict(list)
for element in context_elements:
modality_groups[element.modality].append(element)
compressed_modalities = {}
total_original = 0
total_compressed = 0
for modality, elements in modality_groups.items():
compressed_modality = await self._compress_modality_group(elements, modality, quality_threshold)
compressed_modalities[modality.value] = compressed_modality
total_original += len(elements)
total_compressed += len(compressed_modality["compressed_elements"])
return {
"strategy": "multimodal",
"modalities": compressed_modalities,
"cross_modal_correlations": await self._analyze_cross_modal_correlations(modality_groups),
"compression_ratio": total_compressed / max(total_original, 1)
}
async def _compress_element_group(
self,
elements: List[ContextElement],
compression_level: float,
group_type: str
) -> List[Dict[str, Any]]:
"""Compress a group of elements."""
if not elements:
return []
compressed = []
for element in elements:
if group_type == "core":
# Preserve full detail for core elements
compressed.append({
"id": element.id,
"content": element.content,
"metadata": element.metadata,
"full_representation": True
})
elif group_type == "supporting":
# Summarize supporting elements
compressed.append({
"id": element.id,
"summary": self._generate_summary(element.content),
"key_points": element.metadata.get("key_points", []),
"confidence": element.confidence
})
else: # peripheral
# Abstract peripheral elements
compressed.append({
"id": element.id,
"theme": element.metadata.get("theme", str(element.content)[:50]),
"relevance_score": element.relevance_score,
"abstract": True
})
return compressed
async def _group_by_semantic_similarity(self, context_elements: List[ContextElement]) -> List[Dict[str, Any]]:
"""Group context elements by semantic similarity."""
# Simple semantic grouping based on keywords and themes
groups = defaultdict(list)
for element in context_elements:
# Extract semantic theme
theme = self._extract_semantic_theme(element)
groups[theme].append(element)
# Convert to list format
semantic_groups = []
for theme, elements in groups.items():
semantic_groups.append({
"theme": theme,
"elements": elements,
"coherence_score": self._calculate_group_coherence(elements)
})
return semantic_groups
def _extract_semantic_theme(self, element: ContextElement) -> str:
"""Extract semantic theme from context element."""
content = str(element.content).lower()
metadata = element.metadata
# Simple theme extraction logic
if any(word in content for word in ["urgent", "asap", "deadline"]):
return "urgency"
elif any(word in content for word in ["team", "collaborate", "group"]):
return "collaboration"
elif any(word in content for word in ["data", "analysis", "metrics"]):
return "data_analysis"
elif any(word in content for word in ["customer", "client", "user"]):
return "customer_oriented"
else:
return "general"
def _calculate_group_coherence(self, elements: List[ContextElement]) -> float:
"""Calculate semantic coherence of a group."""
if len(elements) < 2:
return 1.0
# Calculate average similarity within group
similarities = []
for i, elem1 in enumerate(elements):
for j, elem2 in enumerate(elements[i+1:], i+1):
# Simple similarity based on metadata overlap
overlap = len(set(elem1.metadata.keys()) & set(elem2.metadata.keys()))
total_keys = len(set(elem1.metadata.keys()) | set(elem2.metadata.keys()))
similarity = overlap / max(total_keys, 1)
similarities.append(similarity)
return np.mean(similarities) if similarities else 0.5
def _generate_summary(self, content: Any) -> str:
"""Generate summary of content."""
content_str = str(content)
if len(content_str) <= 100:
return content_str
# Simple summarization
sentences = content_str.split('.')
if len(sentences) > 2:
return '. '.join(sentences[:2]) + '.'
else:
return content_str[:97] + "..."
async def _compress_semantic_group(
self,
elements: List[ContextElement],
theme: str,
quality_threshold: float
) -> Dict[str, Any]:
"""Compress a semantic group of elements."""
# Extract key concepts
key_concepts = await self._extract_key_concepts(elements)
# Generate representative embedding
representative_embedding = await self._generate_representative_embedding(elements)
# Select representative elements
representative_elements = sorted(
elements,
key=lambda e: e.relevance_score * e.confidence,
reverse=True
)[:min(5, len(elements))]
return {
"theme": theme,
"compressed_elements": [self._element_to_summary(e) for e in representative_elements],
"key_concepts": key_concepts,
"representative_embedding": representative_embedding.tolist() if isinstance(representative_embedding, np.ndarray) else representative_embedding,
"confidence": np.mean([e.confidence for e in elements])
}
async def _extract_key_concepts(self, elements: List[ContextElement]) -> List[str]:
"""Extract key concepts from semantic group."""
concepts = []
concept_scores = defaultdict(float)
for element in elements:
content = str(element.content).lower()
metadata = element.metadata
# Extract from content
words = content.split()
for word in words:
if len(word) > 3: # Skip short words
concept_scores[word] += element.relevance_score
# Extract from metadata
for key, value in metadata.items():
if isinstance(value, str):
concept_scores[value.lower()] += element.confidence * 0.5
# Select top concepts
sorted_concepts = sorted(concept_scores.items(), key=lambda x: x[1], reverse=True)
return [concept for concept, score in sorted_concepts[:10]]
async def _generate_representative_embedding(self, elements: List[ContextElement]) -> np.ndarray:
"""Generate representative embedding for semantic group."""
if not elements:
return np.zeros(128)
# Simple averaging approach (in production would use more sophisticated methods)
embeddings = []
for element in elements:
# Generate mock embedding
np.random.seed(hash(element.id) % (2**32))
embedding = np.random.rand(128)
embeddings.append(embedding * element.relevance_score)
if embeddings:
return np.mean(embeddings, axis=0)
else:
return np.zeros(128)
def _element_to_summary(self, element: ContextElement) -> Dict[str, Any]:
"""Convert element to summary format."""
return {
"id": element.id,
"content_summary": self._generate_summary(element.content),
"relevance_score": element.relevance_score,
"confidence": element.confidence,
"key_metadata": {k: v for k, v in list(element.metadata.items())[:3]}
}
async def _compress_temporal_window(
self,
elements: List[ContextElement],
window_name: str,
quality_threshold: float
) -> Dict[str, Any]:
"""Compress a temporal window of elements."""
# Apply time-based compression rules
if window_name == "immediate":
# Preserve detailed information for immediate context
compression_level = 0.9
elif window_name == "recent":
# Moderate compression for recent context
compression_level = 0.7
else:
# Aggressive compression for historical context
compression_level = 0.4
compressed_elements = []
for element in elements:
if element.relevance_score > quality_threshold:
if compression_level > 0.8:
compressed_elements.append(self._element_to_summary(element))
else:
compressed_elements.append({
"id": element.id,
"content_summary": self._generate_summary(element.content),
"relevance_score": element.relevance_score
})
return {
"window_name": window_name,
"compression_level": compression_level,
"compressed_elements": compressed_elements,
"time_range": {
"start": min(e.timestamp for e in elements).isoformat(),
"end": max(e.timestamp for e in elements).isoformat()
}
}
async def _compress_with_minimal_loss(self, elements: List[ContextElement], tier: str) -> List[Dict[str, Any]]:
"""Compress with minimal information loss."""
return [self._element_to_summary(e) for e in elements]
async def _compress_with_standard_loss(self, elements: List[ContextElement], tier: str) -> List[Dict[str, Any]]:
"""Compress with standard information loss."""
compressed = []
for element in elements:
compressed.append({
"id": element.id,
"summary": self._generate_summary(element.content),
"relevance_score": element.relevance_score,
"confidence": element.confidence
})
return compressed
async def _compress_aggressively(self, elements: List[ContextElement], tier: str) -> List[Dict[str, Any]]:
"""Compress aggressively with significant information loss."""
compressed = []
for element in elements:
compressed.append({
"id": element.id,
"theme": self._extract_semantic_theme(element),
"relevance_score": element.relevance_score,
"confidence": round(element.confidence, 2)
})
return compressed
async def _compress_modality_group(
self,
elements: List[ContextElement],
modality: ContextModality,
quality_threshold: float
) -> Dict[str, Any]:
"""Compress elements of a specific modality."""
# Modality-specific compression logic
if modality == ContextModality.TEXT:
return await self._compress_text_modality(elements, quality_threshold)
elif modality == ContextModality.VISUAL:
return await self._compress_visual_modality(elements, quality_threshold)
elif modality == ContextModality.AUDITORY:
return await self._compress_auditory_modality(elements, quality_threshold)
else:
return await self._compress_generic_modality(elements, quality_threshold)
async def _compress_text_modality(self, elements: List[ContextElement], quality_threshold: float) -> Dict[str, Any]:
"""Compress text modality elements."""
compressed_elements = []
for element in elements:
if element.relevance_score > quality_threshold:
compressed_elements.append({
"id": element.id,
"content_type": "text",
"content": self._generate_summary(element.content),
"metadata": element.metadata
})
return {
"modality": "text",
"compressed_elements": compressed_elements,
"compression_method": "text_summarization"
}
async def _compress_visual_modality(self, elements: List[ContextElement], quality_threshold: float) -> Dict[str, Any]:
"""Compress visual modality elements."""
compressed_elements = []
for element in elements:
if element.relevance_score > quality_threshold:
# For visual elements, preserve key visual descriptors
compressed_elements.append({
"id": element.id,
"content_type": "visual",
"visual_descriptor": element.metadata.get("visual_descriptor", str(element.content)[:50]),
"metadata": {k: v for k, v in element.metadata.items() if k in ["color", "shape", "size", "location"]}
})
return {
"modality": "visual",
"compressed_elements": compressed_elements,
"compression_method": "visual_feature_extraction"
}
async def _compress_auditory_modality(self, elements: List[ContextElement], quality_threshold: float) -> Dict[str, Any]:
"""Compress auditory modality elements."""
compressed_elements = []
for element in elements:
if element.relevance_score > quality_threshold:
compressed_elements.append({
"id": element.id,
"content_type": "auditory",
"audio_descriptor": element.metadata.get("audio_descriptor", str(element.content)[:50]),
"metadata": {k: v for k, v in element.metadata.items() if k in ["frequency", "amplitude", "duration"]}
})
return {
"modality": "auditory",
"compressed_elements": compressed_elements,
"compression_method": "audio_feature_extraction"
}
async def _compress_generic_modality(self, elements: List[ContextElement], quality_threshold: float) -> Dict[str, Any]:
"""Compress generic modality elements."""
compressed_elements = []
for element in elements:
if element.relevance_score > quality_threshold:
compressed_elements.append({
"id": element.id,
"content_summary": self._generate_summary(element.content),
"relevance_score": element.relevance_score,
"confidence": element.confidence
})
return {
"modality": "generic",
"compressed_elements": compressed_elements,
"compression_method": "generic_compression"
}
async def _analyze_cross_modal_correlations(self, modality_groups: Dict[ContextModality, List[ContextElement]]) -> Dict[str, Any]:
"""Analyze correlations between different modalities."""
correlations = {}
modalities = list(modality_groups.keys())
for i, mod1 in enumerate(modalities):
for mod2 in modalities[i+1:]:
elements1 = modality_groups[mod1]
elements2 = modality_groups[mod2]
# Calculate temporal correlation
temporal_corr = self._calculate_temporal_correlation(elements1, elements2)
# Calculate relevance correlation
relevance_corr = self._calculate_relevance_correlation(elements1, elements2)
correlations[f"{mod1.value}_{mod2.value}"] = {
"temporal_correlation": temporal_corr,
"relevance_correlation": relevance_corr,
"correlation_strength": (temporal_corr + relevance_corr) / 2
}
return correlations
def _calculate_temporal_correlation(self, elements1: List[ContextElement], elements2: List[ContextElement]) -> float:
"""Calculate temporal correlation between two sets of elements."""
if not elements1 or not elements2:
return 0.0
# Simple correlation based on timestamp proximity
timestamps1 = [e.timestamp for e in elements1]
timestamps2 = [e.timestamp for e in elements2]
# Calculate average time differences
avg_diff = 0
count = 0
for t1 in timestamps1:
min_diff = min(abs((t1 - t2).total_seconds()) for t2 in timestamps2)
avg_diff += min_diff
count += 1
if count == 0:
return 0.0
avg_diff /= count
# Convert to correlation score (closer timestamps = higher correlation)
return max(0, 1 - avg_diff / 3600) # Normalize by 1 hour
def _calculate_relevance_correlation(self, elements1: List[ContextElement], elements2: List[ContextElement]) -> float:
"""Calculate relevance correlation between two sets of elements."""
if not elements1 or not elements2:
return 0.0
# Simple correlation based on relevance score patterns
relevance1 = [e.relevance_score for e in elements1]
relevance2 = [e.relevance_score for e in elements2]
if len(relevance1) > 1 and len(relevance2) > 1:
# Calculate variance correlation
var1 = np.var(relevance1)
var2 = np.var(relevance2)
if var1 > 0 and var2 > 0:
# Higher variance correlation indicates similar patterns
return min(var1, var2) / max(var1, var2)
return 0.5 # Default moderate correlation
def _calculate_semantic_coherence(self, compressed_groups: List[Dict[str, Any]]) -> float:
"""Calculate overall semantic coherence of compressed context."""
if not compressed_groups:
return 0.0
coherence_scores = [group.get("confidence", 0.5) for group in compressed_groups]
return np.mean(coherence_scores)
class ContextSynthesisEngine:
"""Advanced context synthesis engine for integrating diverse data sources."""
def __init__(self):
self.synthesis_methods = {
SynthesisMethod.FUSION: self._fusion_synthesis,
SynthesisMethod.MESH: self._mesh_synthesis,
SynthesisMethod.HIERARCHICAL: self._hierarchical_synthesis,
SynthesisMethod.GRAPH_BASED: self._graph_based_synthesis,
SynthesisMethod.ATTENTION_BASED: self._attention_based_synthesis,
SynthesisMethod.ENSEMBLE: self._ensemble_synthesis
}
self.synthesis_cache = {}
async def synthesize_contexts(
self,
context_groups: List[List[ContextElement]],
synthesis_method: SynthesisMethod = SynthesisMethod.FUSION,
quality_threshold: float = 0.6
) -> SynthesizedContext:
"""Synthesize contexts from multiple sources."""
try:
if synthesis_method not in self.synthesis_methods:
synthesis_method = SynthesisMethod.FUSION
synthesis_func = self.synthesis_methods[synthesis_method]
result = await synthesis_func(context_groups, quality_threshold)
# Create synthesized context object
synthesized_context = SynthesizedContext(
id=f"synthesized_{int(time.time())}",
source_elements=[elem.id for group in context_groups for elem in group],
synthesis_method=synthesis_method,
compressed_representation=result,
quality_score=self._calculate_synthesis_quality(result),
confidence=self._calculate_synthesis_confidence(result),
temporal_scope=self._calculate_temporal_scope(context_groups),
semantic_clusters=self._identify_semantic_clusters(result)
)
return synthesized_context
except Exception as e:
logger.error(f"Context synthesis failed: {e}")
return SynthesizedContext(
id=f"error_{int(time.time())}",
source_elements=[],
synthesis_method=synthesis_method,
compressed_representation={"error": str(e)},
quality_score=0.0,
confidence=0.0,
temporal_scope=(datetime.utcnow(), datetime.utcnow()),
semantic_clusters=[]
)
async def _fusion_synthesis(
self,
context_groups: List[List[ContextElement]],
quality_threshold: float
) -> Dict[str, Any]:
"""Perform fusion-based context synthesis."""
# Flatten all elements
all_elements = []
for group in context_groups:
all_elements.extend(group)
# Remove duplicates based on content similarity
unique_elements = self._remove_duplicates(all_elements)
# Sort by relevance and confidence
sorted_elements = sorted(
unique_elements,
key=lambda e: e.relevance_score * e.confidence,
reverse=True
)
# Select top elements based on quality threshold
selected_elements = [
e for e in sorted_elements
if e.relevance_score * e.confidence > quality_threshold
]
# Create fused representation
fused_content = []
for element in selected_elements[:50]: # Limit to top 50
fused_content.append({
"id": element.id,
"content": element.content,
"source_group": self._identify_source_group(element, context_groups),
"relevance_score": element.relevance_score,
"confidence": element.confidence,
"metadata": element.metadata
})
return {
"method": "fusion",
"elements": fused_content,
"fusion_score": len(fused_content) / max(len(unique_elements), 1),
"quality_distribution": self._calculate_quality_distribution(selected_elements)
}
async def _mesh_synthesis(
self,
context_groups: List[List[ContextElement]],
quality_threshold: float
) -> Dict[str, Any]:
"""Perform mesh-based context synthesis."""
# Create connections between elements from different groups
connections = self._create_cross_group_connections(context_groups)
# Build mesh representation
mesh_nodes = []
mesh_edges = []
for group_idx, group in enumerate(context_groups):
for element in group:
if element.relevance_score > quality_threshold:
mesh_nodes.append({
"id": element.id,
"content": element.content,
"group": group_idx,
"relevance_score": element.relevance_score
})
# Add edges based on connections
for connection in connections:
if connection["source"] in [node["id"] for node in mesh_nodes] and \
connection["target"] in [node["id"] for node in mesh_nodes]:
mesh_edges.append(connection)
return {
"method": "mesh",
"nodes": mesh_nodes,
"edges": mesh_edges,
"mesh_density": len(mesh_edges) / max(len(mesh_nodes) * (len(mesh_nodes) - 1) / 2, 1),
"clustering_coefficient": self._calculate_clustering_coefficient(mesh_edges, mesh_nodes)
}
async def _hierarchical_synthesis(
self,
context_groups: List[List[ContextElement]],
quality_threshold: float
) -> Dict[str, Any]:
"""Perform hierarchical context synthesis."""
# Create hierarchy levels
hierarchy = {
"level_1": [], # High importance, cross-group elements
"level_2": [], # Medium importance, group-specific elements
"level_3": [] # Low importance, supporting elements
}
# Analyze cross-group presence
element_presence = self._analyze_cross_group_presence(context_groups)
for group_idx, group in enumerate(context_groups):
for element in group:
if element.relevance_score > quality_threshold:
presence_count = element_presence.get(element.id, 0)
if presence_count > 1: # Appears in multiple groups
hierarchy["level_1"].append({
"id": element.id,
"content": element.content,
"groups": self._find_element_groups(element, context_groups),
"cross_group_relevance": presence_count,
"relevance_score": element.relevance_score
})
elif element.relevance_score > 0.7:
hierarchy["level_2"].append({
"id": element.id,
"content": element.content,
"group": group_idx,
"relevance_score": element.relevance_score
})
else:
hierarchy["level_3"].append({
"id": element.id,
"content_summary": str(element.content)[:100],
"group": group_idx,
"relevance_score": element.relevance_score
})
return {
"method": "hierarchical",
"hierarchy": hierarchy,
"distribution": {
"level_1": len(hierarchy["level_1"]),
"level_2": len(hierarchy["level_2"]),
"level_3": len(hierarchy["level_3"])
}
}
async def _graph_based_synthesis(
self,
context_groups: List[List[ContextElement]],
quality_threshold: float
) -> Dict[str, Any]:
"""Perform graph-based context synthesis."""
# Build knowledge graph
nodes = []
edges = []
# Add nodes
for group_idx, group in enumerate(context_groups):
for element in group:
if element.relevance_score > quality_threshold:
nodes.append({
"id": element.id,
"content": element.content,
"group": group_idx,
"relevance_score": element.relevance_score,
"metadata": element.metadata
})
# Add edges based on semantic relationships
for i, node1 in enumerate(nodes):
for j, node2 in enumerate(nodes[i+1:], i+1):
relationship_strength = self._calculate_relationship_strength(node1, node2)
if relationship_strength > 0.3:
edges.append({
"source": node1["id"],
"target": node2["id"],
"strength": relationship_strength,
"relationship_type": "semantic"
})
# Calculate graph metrics
graph_metrics = self._calculate_graph_metrics(nodes, edges)
return {
"method": "graph_based",
"nodes": nodes,
"edges": edges,
"graph_metrics": graph_metrics,
"community_structure": self._detect_communities(nodes, edges)
}
async def _attention_based_synthesis(
self,
context_groups: List[List[ContextElement]],
quality_threshold: float
) -> Dict[str, Any]:
"""Perform attention-based context synthesis."""
# Calculate attention weights for each element
attention_weights = {}
total_elements = 0
for group_idx, group in enumerate(context_groups):
for element in group:
if element.relevance_score > quality_threshold:
# Calculate attention weight based on multiple factors
relevance_weight = element.relevance_score
confidence_weight = element.confidence
recency_weight = self._calculate_recency_weight(element.timestamp)
diversity_weight = self._calculate_diversity_weight(element, context_groups)
attention_score = (relevance_weight * 0.4 +
confidence_weight * 0.3 +
recency_weight * 0.2 +
diversity_weight * 0.1)
attention_weights[element.id] = attention_score
total_elements += 1
# Normalize attention weights
if attention_weights:
max_weight = max(attention_weights.values())
attention_weights = {k: v / max_weight for k, v in attention_weights.items()}
# Create attention-based representation
attended_elements = []
for group_idx, group in enumerate(context_groups):
for element in group:
if element.id in attention_weights and attention_weights[element.id] > 0.1:
attended_elements.append({
"id": element.id,
"content": element.content,
"group": group_idx,
"attention_weight": attention_weights[element.id],
"relevance_score": element.relevance_score
})
return {
"method": "attention_based",
"attended_elements": attended_elements,
"attention_distribution": self._calculate_attention_distribution(attention_weights),
"attention_entropy": self._calculate_attention_entropy(attention_weights)
}
async def _ensemble_synthesis(
self,
context_groups: List[List[ContextElement]],
quality_threshold: float
) -> Dict[str, Any]:
"""Perform ensemble-based context synthesis combining multiple methods."""
# Apply multiple synthesis methods
synthesis_results = {}
for method in [SynthesisMethod.FUSION, SynthesisMethod.HIERARCHICAL, SynthesisMethod.ATTENTION_BASED]:
try:
result = await self.synthesis_methods[method](context_groups, quality_threshold)
synthesis_results[method.value] = result
except Exception as e:
logger.warning(f"Ensemble synthesis method {method.value} failed: {e}")
# Combine results using voting/weighted approach
ensemble_result = self._combine_synthesis_results(synthesis_results)
return {
"method": "ensemble",
"individual_results": synthesis_results,
"ensemble_result": ensemble_result,
"method_agreement": self._calculate_method_agreement(synthesis_results)
}
def _remove_duplicates(self, elements: List[ContextElement]) -> List[ContextElement]:
"""Remove duplicate elements based on content similarity."""
unique_elements = []
seen_content_hashes = set()
for element in elements:
content_hash = hashlib.md5(str(element.content).encode()).hexdigest()
if content_hash not in seen_content_hashes:
seen_content_hashes.add(content_hash)
unique_elements.append(element)
return unique_elements
def _identify_source_group(self, element: ContextElement, context_groups: List[List[ContextElement]]) -> int:
"""Identify which group an element belongs to."""
for group_idx, group in enumerate(context_groups):
if element in group:
return group_idx
return -1
def _find_element_groups(self, element: ContextElement, context_groups: List[List[ContextElement]]) -> List[int]:
"""Find all groups an element appears in."""
groups = []
for group_idx, group in enumerate(context_groups):
if element in group:
groups.append(group_idx)
return groups
def _calculate_quality_distribution(self, elements: List[ContextElement]) -> Dict[str, float]:
"""Calculate quality score distribution."""
if not elements:
return {}
relevance_scores = [e.relevance_score for e in elements]
confidence_scores = [e.confidence for e in elements]
return {
"mean_relevance": np.mean(relevance_scores),
"std_relevance": np.std(relevance_scores),
"mean_confidence": np.mean(confidence_scores),
"std_confidence": np.std(confidence_scores)
}
def _create_cross_group_connections(self, context_groups: List[List[ContextElement]]) -> List[Dict[str, Any]]:
"""Create connections between elements from different groups."""
connections = []
for i, group1 in enumerate(context_groups):
for j, group2 in enumerate(context_groups[i+1:], i+1):
for elem1 in group1:
for elem2 in group2:
similarity = self._calculate_element_similarity(elem1, elem2)
if similarity > 0.5:
connections.append({
"source": elem1.id,
"target": elem2.id,
"strength": similarity,
"groups": (i, j)
})
return connections
def _calculate_element_similarity(self, elem1: ContextElement, elem2: ContextElement) -> float:
"""Calculate similarity between two elements."""
# Simple similarity based on metadata overlap
common_metadata = set(elem1.metadata.keys()) & set(elem2.metadata.keys())
total_metadata = set(elem1.metadata.keys()) | set(elem2.metadata.keys())
if not total_metadata:
return 0.0
metadata_similarity = len(common_metadata) / len(total_metadata)
# Content similarity (simplified)
content1 = str(elem1.content).lower()
content2 = str(elem2.content).lower()
content_words1 = set(content1.split())
content_words2 = set(content2.split())
if not content_words1 and not content_words2:
content_similarity = 1.0
elif not content_words1 or not content_words2:
content_similarity = 0.0
else:
content_similarity = len(content_words1 & content_words2) / len(content_words1 | content_words2)
return (metadata_similarity + content_similarity) / 2
def _calculate_clustering_coefficient(self, edges: List[Dict[str, Any]], nodes: List[Dict[str, Any]]) -> float:
"""Calculate clustering coefficient of the mesh."""
if len(nodes) < 3:
return 0.0
# Create adjacency list
adjacency = defaultdict(set)
for edge in edges:
adjacency[edge["source"]].add(edge["target"])
adjacency[edge["target"]].add(edge["source"])
# Calculate local clustering coefficients
clustering_coeffs = []
for node in nodes:
neighbors = adjacency[node["id"]]
if len(neighbors) < 2:
continue
possible_edges = len(neighbors) * (len(neighbors) - 1) / 2
actual_edges = 0
for neighbor1 in neighbors:
for neighbor2 in neighbors:
if neighbor1 != neighbor2 and neighbor2 in adjacency[neighbor1]:
actual_edges += 1
if possible_edges > 0:
clustering_coeffs.append(actual_edges / possible_edges)
return np.mean(clustering_coeffs) if clustering_coeffs else 0.0
def _analyze_cross_group_presence(self, context_groups: List[List[ContextElement]]) -> Dict[str, int]:
"""Analyze which elements appear in multiple groups."""
element_presence = defaultdict(int)
for group in context_groups:
group_elements = set(elem.id for elem in group)
for elem_id in group_elements:
element_presence[elem_id] += 1
return dict(element_presence)
def _calculate_relationship_strength(self, node1: Dict[str, Any], node2: Dict[str, Any]) -> float:
"""Calculate relationship strength between two nodes."""
# Simple relationship strength based on relevance scores and metadata overlap
relevance_factor = min(node1["relevance_score"], node2["relevance_score"])
# Metadata similarity
metadata1 = node1.get("metadata", {})
metadata2 = node2.get("metadata", {})
if not metadata1 and not metadata2:
metadata_similarity = 1.0
elif not metadata1 or not metadata2:
metadata_similarity = 0.0
else:
common_keys = set(metadata1.keys()) & set(metadata2.keys())
total_keys = set(metadata1.keys()) | set(metadata2.keys())
metadata_similarity = len(common_keys) / max(len(total_keys), 1)
return (relevance_factor + metadata_similarity) / 2
def _calculate_graph_metrics(self, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> Dict[str, float]:
"""Calculate metrics of the knowledge graph."""
if not nodes:
return {}
num_nodes = len(nodes)
num_edges = len(edges)
# Basic metrics
density = num_edges / max(num_nodes * (num_nodes - 1) / 2, 1)
# Average degree
degrees = defaultdict(int)
for edge in edges:
degrees[edge["source"]] += 1
degrees[edge["target"]] += 1
avg_degree = np.mean(list(degrees.values())) if degrees else 0
return {
"nodes": num_nodes,
"edges": num_edges,
"density": density,
"average_degree": avg_degree,
"connected_components": self._count_connected_components(nodes, edges)
}
def _count_connected_components(self, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> int:
"""Count connected components in the graph."""
if not nodes:
return 0
# Build adjacency list
adjacency = defaultdict(set)
for edge in edges:
adjacency[edge["source"]].add(edge["target"])
adjacency[edge["target"]].add(edge["source"])
# Find connected components using DFS
visited = set()
components = 0
for node in nodes:
if node["id"] not in visited:
self._dfs(node["id"], adjacency, visited)
components += 1
return components
def _dfs(self, node: str, adjacency: Dict[str, set], visited: set):
"""Depth-first search for connected components."""
visited.add(node)
for neighbor in adjacency[node]:
if neighbor not in visited:
self._dfs(neighbor, adjacency, visited)
def _detect_communities(self, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> List[List[str]]:
"""Detect communities in the graph (simplified algorithm)."""
# Simple community detection based on edge density
if not nodes or not edges:
return [[node["id"]] for node in nodes]
# Group nodes by their connectivity patterns
communities = []
remaining_nodes = [node["id"] for node in nodes]
while remaining_nodes:
# Start a new community
community = [remaining_nodes.pop(0)]
# Add connected nodes
to_add = True
while to_add:
to_add = False
for node_id in remaining_nodes[:]:
# Check if node is well-connected to community
connections = sum(1 for edge in edges
if (edge["source"] == node_id and edge["target"] in community) or
(edge["target"] == node_id and edge["source"] in community))
if connections >= len(community) * 0.3: # At least 30% connection rate
community.append(node_id)
remaining_nodes.remove(node_id)
to_add = True
break
communities.append(community)
return communities
def _calculate_recency_weight(self, timestamp: datetime) -> float:
"""Calculate recency weight based on timestamp."""
age_seconds = (datetime.utcnow() - timestamp).total_seconds()
return max(0, 1 - age_seconds / 86400) # Decay over 24 hours
def _calculate_diversity_weight(self, element: ContextElement, context_groups: List[List[ContextElement]]) -> float:
"""Calculate diversity weight based on element uniqueness."""
# Simple diversity calculation based on content uniqueness
content = str(element.content).lower()
word_count = len(content.split())
# Normalize by typical word count (assuming 20 words is average)
diversity_score = min(1.0, word_count / 20.0)
return diversity_score
def _calculate_attention_distribution(self, attention_weights: Dict[str, float]) -> Dict[str, float]:
"""Calculate attention weight distribution."""
if not attention_weights:
return {}
weights = list(attention_weights.values())
return {
"mean": np.mean(weights),
"std": np.std(weights),
"min": np.min(weights),
"max": np.max(weights),
"top_10_percent": np.percentile(weights, 90)
}
def _calculate_attention_entropy(self, attention_weights: Dict[str, float]) -> float:
"""Calculate entropy of attention distribution."""
if not attention_weights:
return 0.0
weights = list(attention_weights.values())
total = sum(weights)
if total == 0:
return 0.0
# Normalize to probabilities
probs = [w / total for w in weights]
# Calculate entropy
entropy = -sum(p * np.log2(p) for p in probs if p > 0)
return entropy
def _combine_synthesis_results(self, synthesis_results: Dict[str, Any]) -> Dict[str, Any]:
"""Combine results from multiple synthesis methods."""
combined_elements = {}
# Combine elements from all methods
for method_name, result in synthesis_results.items():
if "elements" in result:
for element in result["elements"]:
element_id = element["id"]
if element_id not in combined_elements:
combined_elements[element_id] = {
"id": element_id,
"content": element["content"],
"methods": [method_name],
"confidence_scores": [element.get("confidence", element.get("relevance_score", 0.5))]
}
else:
combined_elements[element_id]["methods"].append(method_name)
if "confidence" in element:
combined_elements[element_id]["confidence_scores"].append(element["confidence"])
# Calculate combined confidence
for element_id, element_data in combined_elements.items():
scores = element_data["confidence_scores"]
element_data["combined_confidence"] = np.mean(scores) if scores else 0.5
element_data["method_agreement"] = len(element_data["methods"])
return {
"combined_elements": list(combined_elements.values()),
"total_unique_elements": len(combined_elements),
"average_method_agreement": np.mean([elem["method_agreement"] for elem in combined_elements.values()])
}
def _calculate_method_agreement(self, synthesis_results: Dict[str, Any]) -> float:
"""Calculate agreement between synthesis methods."""
if len(synthesis_results) < 2:
return 1.0
element_sets = []
for method_name, result in synthesis_results.items():
if "elements" in result:
element_ids = set(elem["id"] for elem in result["elements"])
element_sets.append(element_ids)
if not element_sets:
return 0.0
# Calculate Jaccard similarity between all pairs
similarities = []
for i in range(len(element_sets)):
for j in range(i+1, len(element_sets)):
intersection = len(element_sets[i] & element_sets[j])
union = len(element_sets[i] | element_sets[j])
similarity = intersection / max(union, 1)
similarities.append(similarity)
return np.mean(similarities) if similarities else 0.0
def _calculate_synthesis_quality(self, result: Dict[str, Any]) -> float:
"""Calculate overall quality of synthesis result."""
if "elements" in result:
elements = result["elements"]
if not elements:
return 0.0
avg_relevance = np.mean([elem.get("relevance_score", 0.5) for elem in elements])
avg_confidence = np.mean([elem.get("confidence", 0.5) for elem in elements])
return (avg_relevance + avg_confidence) / 2
return 0.5 # Default quality score
def _calculate_synthesis_confidence(self, result: Dict[str, Any]) -> float:
"""Calculate confidence in synthesis result."""
# Base confidence on method and coverage
method_confidence = {
"fusion": 0.8,
"mesh": 0.7,
"hierarchical": 0.9,
"graph_based": 0.6,
"attention_based": 0.8,
"ensemble": 0.95
}
base_confidence = method_confidence.get(result.get("method", ""), 0.7)
# Adjust based on result coverage
if "elements" in result:
coverage_bonus = min(0.2, len(result["elements"]) / 100)
return min(1.0, base_confidence + coverage_bonus)
return base_confidence
def _calculate_temporal_scope(self, context_groups: List[List[ContextElement]]) -> Tuple[datetime, datetime]:
"""Calculate temporal scope of synthesized context."""
all_timestamps = []
for group in context_groups:
for element in group:
all_timestamps.append(element.timestamp)
if all_timestamps:
return (min(all_timestamps), max(all_timestamps))
else:
current_time = datetime.utcnow()
return (current_time, current_time)
def _identify_semantic_clusters(self, result: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Identify semantic clusters in the synthesis result."""
clusters = []
if "elements" in result:
elements = result["elements"]
# Simple clustering based on content similarity
if len(elements) > 1:
# Group elements with similar themes
themes = defaultdict(list)
for element in elements:
# Extract theme from content
content = str(element.get("content", "")).lower()
if "urgent" in content or "asap" in content:
themes["urgency"].append(element)
elif "team" in content or "collaborate" in content:
themes["collaboration"].append(element)
elif "data" in content or "analysis" in content:
themes["data_analysis"].append(element)
else:
themes["general"].append(element)
for theme, theme_elements in themes.items():
if len(theme_elements) > 1:
clusters.append({
"theme": theme,
"elements": [elem["id"] for elem in theme_elements],
"size": len(theme_elements),
"coherence": len(theme_elements) / len(elements)
})
return clusters
# Integration with main context engineering system
class ContextCompressionAndSynthesis:
"""Integrated context compression and synthesis system."""
def __init__(self):
self.compression_engine = ContextCompressionEngine()
self.synthesis_engine = ContextSynthesisEngine()
async def process_context_pipeline(
self,
input_context: List[ContextElement],
compression_strategy: CompressionStrategy = CompressionStrategy.ADAPTIVE,
synthesis_method: SynthesisMethod = SynthesisMethod.FUSION,
quality_threshold: float = 0.6
) -> Dict[str, Any]:
"""Process context through compression and synthesis pipeline."""
# Step 1: Compress context
compression_result = await self.compression_engine.compress_context(
input_context, compression_strategy, quality_threshold
)
# Step 2: Prepare compressed elements for synthesis
compressed_elements = self._extract_compressed_elements(compression_result)
# Step 3: Synthesize contexts if multiple groups
# For demo, treat all compressed elements as one group
synthesis_result = await self.synthesis_engine.synthesize_contexts(
[compressed_elements], synthesis_method, quality_threshold
)
return {
"compression": compression_result,
"synthesis": {
"id": synthesis_result.id,
"method": synthesis_result.synthesis_method.value,
"quality_score": synthesis_result.quality_score,
"confidence": synthesis_result.confidence,
"temporal_scope": {
"start": synthesis_result.temporal_scope[0].isoformat(),
"end": synthesis_result.temporal_scope[1].isoformat()
},
"semantic_clusters": synthesis_result.semantic_clusters
},
"pipeline_metrics": self._calculate_pipeline_metrics(compression_result, synthesis_result)
}
def _extract_compressed_elements(self, compression_result: Dict[str, Any]) -> List[ContextElement]:
"""Extract context elements from compression result."""
elements = []
if compression_result.get("strategy") == "hierarchical":
for level_name, level_data in compression_result.get("levels", {}).items():
for compressed_elem in level_data:
if isinstance(compressed_elem, dict):
# Convert back to ContextElement-like object
elements.append(type('ContextElement', (), {
'id': compressed_elem.get("id", f"compressed_{level_name}_{len(elements)}"),
'content': compressed_elem.get("content", compressed_elem.get("summary", "")),
'relevance_score': compressed_elem.get("relevance_score", 0.5),
'confidence': compressed_elem.get("confidence", 0.5),
'timestamp': datetime.utcnow(),
'metadata': compressed_elem.get("metadata", {})
})())
return elements
def _calculate_pipeline_metrics(
self,
compression_result: Dict[str, Any],
synthesis_result: SynthesizedContext
) -> Dict[str, Any]:
"""Calculate metrics for the compression-synthesis pipeline."""
return {
"compression_ratio": compression_result.get("compression_ratio", 0),
"synthesis_quality": synthesis_result.quality_score,
"synthesis_confidence": synthesis_result.confidence,
"semantic_clusters": len(synthesis_result.semantic_clusters),
"temporal_span_hours": (synthesis_result.temporal_scope[1] - synthesis_result.temporal_scope[0]).total_seconds() / 3600,
"processing_efficiency": synthesis_result.quality_score / max(compression_result.get("compression_ratio", 1), 0.1)
}
if __name__ == "__main__":
print("Context Compression and Synthesis System Initialized")
print("=" * 60)
system = ContextCompressionAndSynthesis()
print("Ready for advanced context compression and multi-source synthesis!")