|
|
""" |
|
|
Context Compression and Synthesis System |
|
|
======================================= |
|
|
|
|
|
Advanced context compression and efficient encoding system with synthesis capabilities |
|
|
for integrating data from diverse sources and modalities within unified contextual models. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
import pickle |
|
|
import zlib |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Any, Optional, Tuple, Union, Set |
|
|
from dataclasses import dataclass, field |
|
|
from enum import Enum |
|
|
import numpy as np |
|
|
from collections import defaultdict, deque |
|
|
import hashlib |
|
|
import base64 |
|
|
|
|
|
from ai_agent_framework.core.context_engineering_agent import ( |
|
|
ContextElement, ContextModality, ContextDimension, ContextVector |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class CompressionStrategy(Enum): |
|
|
"""Context compression strategies.""" |
|
|
HIERARCHICAL = "hierarchical" |
|
|
SEMANTIC = "semantic" |
|
|
TEMPORAL = "temporal" |
|
|
RELEVANCE = "relevance" |
|
|
MULTIMODAL = "multimodal" |
|
|
ADAPTIVE = "adaptive" |
|
|
|
|
|
|
|
|
class SynthesisMethod(Enum): |
|
|
"""Context synthesis methods.""" |
|
|
FUSION = "fusion" |
|
|
MESH = "mesh" |
|
|
HIERARCHICAL = "hierarchical" |
|
|
GRAPH_BASED = "graph_based" |
|
|
ATTENTION_BASED = "attention_based" |
|
|
ENSEMBLE = "ensemble" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class CompressionConfig: |
|
|
"""Configuration for context compression.""" |
|
|
strategy: CompressionStrategy |
|
|
compression_ratio: float |
|
|
quality_threshold: float |
|
|
adaptive_threshold: float |
|
|
max_context_size: int |
|
|
expiry_policy: Dict[str, Any] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SynthesizedContext: |
|
|
"""Represents synthesized context from multiple sources.""" |
|
|
id: str |
|
|
source_elements: List[str] |
|
|
synthesis_method: SynthesisMethod |
|
|
compressed_representation: Dict[str, Any] |
|
|
quality_score: float |
|
|
confidence: float |
|
|
temporal_scope: Tuple[datetime, datetime] |
|
|
semantic_clusters: List[Dict[str, Any]] |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.id: |
|
|
self.id = str(hashlib.md5(str(self.source_elements).encode()).hexdigest()) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ContextVector: |
|
|
"""Vector representation of context for compression.""" |
|
|
element_id: str |
|
|
vector: np.ndarray |
|
|
vector_type: str |
|
|
compression_level: int |
|
|
quality_score: float |
|
|
timestamp: datetime |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.timestamp: |
|
|
self.timestamp = datetime.utcnow() |
|
|
|
|
|
|
|
|
class ContextCompressionEngine: |
|
|
"""Advanced context compression engine with multiple strategies.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.compression_configs = self._initialize_compression_configs() |
|
|
self.vector_index = {} |
|
|
self.compression_cache = {} |
|
|
self.quality_metrics = {} |
|
|
|
|
|
def _initialize_compression_configs(self) -> Dict[CompressionStrategy, CompressionConfig]: |
|
|
"""Initialize compression configurations for different strategies.""" |
|
|
return { |
|
|
CompressionStrategy.HIERARCHICAL: CompressionConfig( |
|
|
strategy=CompressionStrategy.HIERARCHICAL, |
|
|
compression_ratio=0.7, |
|
|
quality_threshold=0.6, |
|
|
adaptive_threshold=0.5, |
|
|
max_context_size=100, |
|
|
expiry_policy={"level_1": timedelta(minutes=30), "level_2": timedelta(hours=2)} |
|
|
), |
|
|
CompressionStrategy.SEMANTIC: CompressionConfig( |
|
|
strategy=CompressionStrategy.SEMANTIC, |
|
|
compression_ratio=0.8, |
|
|
quality_threshold=0.7, |
|
|
adaptive_threshold=0.6, |
|
|
max_context_size=50, |
|
|
expiry_policy={"core_concepts": timedelta(hours=6), "supporting": timedelta(hours=1)} |
|
|
), |
|
|
CompressionStrategy.TEMPORAL: CompressionConfig( |
|
|
strategy=CompressionStrategy.TEMPORAL, |
|
|
compression_ratio=0.6, |
|
|
quality_threshold=0.5, |
|
|
adaptive_threshold=0.4, |
|
|
max_context_size=200, |
|
|
expiry_policy={"immediate": timedelta(minutes=15), "recent": timedelta(hours=4)} |
|
|
), |
|
|
CompressionStrategy.RELEVANCE: CompressionConfig( |
|
|
strategy=CompressionStrategy.RELEVANCE, |
|
|
compression_ratio=0.9, |
|
|
quality_threshold=0.8, |
|
|
adaptive_threshold=0.7, |
|
|
max_context_size=30, |
|
|
expiry_policy={"high_relevance": timedelta(days=1), "medium": timedelta(hours=6)} |
|
|
), |
|
|
CompressionStrategy.MULTIMODAL: CompressionConfig( |
|
|
strategy=CompressionStrategy.MULTIMODAL, |
|
|
compression_ratio=0.5, |
|
|
quality_threshold=0.6, |
|
|
adaptive_threshold=0.5, |
|
|
max_context_size=150, |
|
|
expiry_policy={"text": timedelta(hours=2), "visual": timedelta(minutes=30)} |
|
|
) |
|
|
} |
|
|
|
|
|
async def compress_context( |
|
|
self, |
|
|
context_elements: List[ContextElement], |
|
|
strategy: CompressionStrategy = CompressionStrategy.ADAPTIVE, |
|
|
quality_threshold: float = 0.6 |
|
|
) -> Dict[str, Any]: |
|
|
"""Compress context using specified strategy.""" |
|
|
try: |
|
|
if strategy == CompressionStrategy.ADAPTIVE: |
|
|
strategy = await self._select_optimal_strategy(context_elements) |
|
|
|
|
|
config = self.compression_configs.get(strategy) |
|
|
if not config: |
|
|
strategy = CompressionStrategy.HIERARCHICAL |
|
|
config = self.compression_configs[strategy] |
|
|
|
|
|
if strategy == CompressionStrategy.HIERARCHICAL: |
|
|
return await self._hierarchical_compression(context_elements, config, quality_threshold) |
|
|
elif strategy == CompressionStrategy.SEMANTIC: |
|
|
return await self._semantic_compression(context_elements, config, quality_threshold) |
|
|
elif strategy == CompressionStrategy.TEMPORAL: |
|
|
return await self._temporal_compression(context_elements, config, quality_threshold) |
|
|
elif strategy == CompressionStrategy.RELEVANCE: |
|
|
return await self._relevance_compression(context_elements, config, quality_threshold) |
|
|
elif strategy == CompressionStrategy.MULTIMODAL: |
|
|
return await self._multimodal_compression(context_elements, config, quality_threshold) |
|
|
else: |
|
|
return await self._hierarchical_compression(context_elements, config, quality_threshold) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Context compression failed: {e}") |
|
|
return {"status": "error", "error": str(e)} |
|
|
|
|
|
async def _select_optimal_strategy(self, context_elements: List[ContextElement]) -> CompressionStrategy: |
|
|
"""Select optimal compression strategy based on context characteristics.""" |
|
|
if not context_elements: |
|
|
return CompressionStrategy.HIERARCHICAL |
|
|
|
|
|
|
|
|
modalities = set(element.modality for element in context_elements) |
|
|
temporal_spread = self._calculate_temporal_spread(context_elements) |
|
|
relevance_variance = np.var([element.relevance_score for element in context_elements]) |
|
|
|
|
|
|
|
|
if len(modalities) > 3: |
|
|
return CompressionStrategy.MULTIMODAL |
|
|
elif temporal_spread > timedelta(hours=2): |
|
|
return CompressionStrategy.TEMPORAL |
|
|
elif relevance_variance > 0.1: |
|
|
return CompressionStrategy.RELEVANCE |
|
|
else: |
|
|
return CompressionStrategy.SEMANTIC |
|
|
|
|
|
def _calculate_temporal_spread(self, context_elements: List[ContextElement]) -> timedelta: |
|
|
"""Calculate temporal spread of context elements.""" |
|
|
if len(context_elements) < 2: |
|
|
return timedelta(0) |
|
|
|
|
|
timestamps = [element.timestamp for element in context_elements] |
|
|
return max(timestamps) - min(timestamps) |
|
|
|
|
|
async def _hierarchical_compression( |
|
|
self, |
|
|
context_elements: List[ContextElement], |
|
|
config: CompressionConfig, |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform hierarchical context compression.""" |
|
|
|
|
|
|
|
|
core_context = [ |
|
|
element for element in context_elements |
|
|
if element.relevance_score > 0.8 and element.confidence > 0.7 |
|
|
] |
|
|
|
|
|
|
|
|
supporting_context = [ |
|
|
element for element in context_elements |
|
|
if 0.5 < element.relevance_score <= 0.8 |
|
|
] |
|
|
|
|
|
|
|
|
peripheral_context = [ |
|
|
element for element in context_elements |
|
|
if element.relevance_score <= 0.5 |
|
|
] |
|
|
|
|
|
|
|
|
compressed_core = await self._compress_element_group(core_context, 0.9, "core") |
|
|
compressed_supporting = await self._compress_element_group(supporting_context, 0.7, "supporting") |
|
|
compressed_peripheral = await self._compress_element_group(peripheral_context, 0.5, "peripheral") |
|
|
|
|
|
return { |
|
|
"strategy": "hierarchical", |
|
|
"levels": { |
|
|
"core": compressed_core, |
|
|
"supporting": compressed_supporting, |
|
|
"peripheral": compressed_peripheral |
|
|
}, |
|
|
"compression_ratio": len(compressed_core) + len(compressed_supporting) + len(compressed_peripheral), |
|
|
"quality_threshold": quality_threshold, |
|
|
"total_elements": len(context_elements), |
|
|
"compressed_elements": len(compressed_core) + len(compressed_supporting) + len(compressed_peripheral) |
|
|
} |
|
|
|
|
|
async def _semantic_compression( |
|
|
self, |
|
|
context_elements: List[ContextElement], |
|
|
config: CompressionConfig, |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform semantic context compression.""" |
|
|
|
|
|
|
|
|
semantic_groups = await self._group_by_semantic_similarity(context_elements) |
|
|
|
|
|
compressed_groups = [] |
|
|
total_original_size = 0 |
|
|
total_compressed_size = 0 |
|
|
|
|
|
for group in semantic_groups: |
|
|
|
|
|
group_elements = group["elements"] |
|
|
group_theme = group["theme"] |
|
|
|
|
|
compressed_group = await self._compress_semantic_group(group_elements, group_theme, quality_threshold) |
|
|
|
|
|
total_original_size += len(group_elements) |
|
|
total_compressed_size += len(compressed_group["compressed_elements"]) |
|
|
|
|
|
compressed_groups.append({ |
|
|
"theme": group_theme, |
|
|
"elements": compressed_group["compressed_elements"], |
|
|
"key_concepts": compressed_group["key_concepts"], |
|
|
"representative_embedding": compressed_group["representative_embedding"], |
|
|
"confidence": compressed_group["confidence"] |
|
|
}) |
|
|
|
|
|
return { |
|
|
"strategy": "semantic", |
|
|
"groups": compressed_groups, |
|
|
"compression_ratio": total_compressed_size / max(total_original_size, 1), |
|
|
"quality_threshold": quality_threshold, |
|
|
"semantic_coherence": self._calculate_semantic_coherence(compressed_groups) |
|
|
} |
|
|
|
|
|
async def _temporal_compression( |
|
|
self, |
|
|
context_elements: List[ContextElement], |
|
|
config: CompressionConfig, |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform temporal context compression.""" |
|
|
|
|
|
|
|
|
sorted_elements = sorted(context_elements, key=lambda e: e.timestamp) |
|
|
|
|
|
|
|
|
current_time = datetime.utcnow() |
|
|
immediate_window = current_time - timedelta(minutes=30) |
|
|
recent_window = current_time - timedelta(hours=4) |
|
|
|
|
|
temporal_windows = { |
|
|
"immediate": [], |
|
|
"recent": [], |
|
|
"historical": [] |
|
|
} |
|
|
|
|
|
for element in sorted_elements: |
|
|
if element.timestamp >= immediate_window: |
|
|
temporal_windows["immediate"].append(element) |
|
|
elif element.timestamp >= recent_window: |
|
|
temporal_windows["recent"].append(element) |
|
|
else: |
|
|
temporal_windows["historical"].append(element) |
|
|
|
|
|
|
|
|
compressed_windows = {} |
|
|
for window_name, elements in temporal_windows.items(): |
|
|
if elements: |
|
|
compressed_windows[window_name] = await self._compress_temporal_window( |
|
|
elements, window_name, quality_threshold |
|
|
) |
|
|
|
|
|
return { |
|
|
"strategy": "temporal", |
|
|
"windows": compressed_windows, |
|
|
"temporal_distribution": {name: len(elements) for name, elements in temporal_windows.items()}, |
|
|
"compression_ratio": sum(len(window.get("compressed_elements", [])) |
|
|
for window in compressed_windows.values()) / max(len(context_elements), 1) |
|
|
} |
|
|
|
|
|
async def _relevance_compression( |
|
|
self, |
|
|
context_elements: List[ContextElement], |
|
|
config: CompressionConfig, |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform relevance-based context compression.""" |
|
|
|
|
|
|
|
|
sorted_elements = sorted(context_elements, key=lambda e: e.relevance_score, reverse=True) |
|
|
|
|
|
|
|
|
high_relevance = [e for e in sorted_elements if e.relevance_score > 0.8] |
|
|
medium_relevance = [e for e in sorted_elements if 0.5 < e.relevance_score <= 0.8] |
|
|
low_relevance = [e for e in sorted_elements if e.relevance_score <= 0.5] |
|
|
|
|
|
|
|
|
compressed_high = await self._compress_with_minimal_loss(high_relevance, "high_relevance") |
|
|
compressed_medium = await self._compress_with_standard_loss(medium_relevance, "medium_relevance") |
|
|
compressed_low = await self._compress_aggressively(low_relevance, "low_relevance") |
|
|
|
|
|
return { |
|
|
"strategy": "relevance", |
|
|
"relevance_tiers": { |
|
|
"high": compressed_high, |
|
|
"medium": compressed_medium, |
|
|
"low": compressed_low |
|
|
}, |
|
|
"preservation_rate": { |
|
|
"high": len(compressed_high) / max(len(high_relevance), 1), |
|
|
"medium": len(compressed_medium) / max(len(medium_relevance), 1), |
|
|
"low": len(compressed_low) / max(len(low_relevance), 1) |
|
|
}, |
|
|
"compression_ratio": (len(compressed_high) + len(compressed_medium) + len(compressed_low)) / max(len(context_elements), 1) |
|
|
} |
|
|
|
|
|
async def _multimodal_compression( |
|
|
self, |
|
|
context_elements: List[ContextElement], |
|
|
config: CompressionConfig, |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform multimodal context compression.""" |
|
|
|
|
|
|
|
|
modality_groups = defaultdict(list) |
|
|
for element in context_elements: |
|
|
modality_groups[element.modality].append(element) |
|
|
|
|
|
compressed_modalities = {} |
|
|
total_original = 0 |
|
|
total_compressed = 0 |
|
|
|
|
|
for modality, elements in modality_groups.items(): |
|
|
compressed_modality = await self._compress_modality_group(elements, modality, quality_threshold) |
|
|
compressed_modalities[modality.value] = compressed_modality |
|
|
|
|
|
total_original += len(elements) |
|
|
total_compressed += len(compressed_modality["compressed_elements"]) |
|
|
|
|
|
return { |
|
|
"strategy": "multimodal", |
|
|
"modalities": compressed_modalities, |
|
|
"cross_modal_correlations": await self._analyze_cross_modal_correlations(modality_groups), |
|
|
"compression_ratio": total_compressed / max(total_original, 1) |
|
|
} |
|
|
|
|
|
async def _compress_element_group( |
|
|
self, |
|
|
elements: List[ContextElement], |
|
|
compression_level: float, |
|
|
group_type: str |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Compress a group of elements.""" |
|
|
if not elements: |
|
|
return [] |
|
|
|
|
|
compressed = [] |
|
|
|
|
|
for element in elements: |
|
|
if group_type == "core": |
|
|
|
|
|
compressed.append({ |
|
|
"id": element.id, |
|
|
"content": element.content, |
|
|
"metadata": element.metadata, |
|
|
"full_representation": True |
|
|
}) |
|
|
elif group_type == "supporting": |
|
|
|
|
|
compressed.append({ |
|
|
"id": element.id, |
|
|
"summary": self._generate_summary(element.content), |
|
|
"key_points": element.metadata.get("key_points", []), |
|
|
"confidence": element.confidence |
|
|
}) |
|
|
else: |
|
|
|
|
|
compressed.append({ |
|
|
"id": element.id, |
|
|
"theme": element.metadata.get("theme", str(element.content)[:50]), |
|
|
"relevance_score": element.relevance_score, |
|
|
"abstract": True |
|
|
}) |
|
|
|
|
|
return compressed |
|
|
|
|
|
async def _group_by_semantic_similarity(self, context_elements: List[ContextElement]) -> List[Dict[str, Any]]: |
|
|
"""Group context elements by semantic similarity.""" |
|
|
|
|
|
|
|
|
groups = defaultdict(list) |
|
|
|
|
|
for element in context_elements: |
|
|
|
|
|
theme = self._extract_semantic_theme(element) |
|
|
groups[theme].append(element) |
|
|
|
|
|
|
|
|
semantic_groups = [] |
|
|
for theme, elements in groups.items(): |
|
|
semantic_groups.append({ |
|
|
"theme": theme, |
|
|
"elements": elements, |
|
|
"coherence_score": self._calculate_group_coherence(elements) |
|
|
}) |
|
|
|
|
|
return semantic_groups |
|
|
|
|
|
def _extract_semantic_theme(self, element: ContextElement) -> str: |
|
|
"""Extract semantic theme from context element.""" |
|
|
content = str(element.content).lower() |
|
|
metadata = element.metadata |
|
|
|
|
|
|
|
|
if any(word in content for word in ["urgent", "asap", "deadline"]): |
|
|
return "urgency" |
|
|
elif any(word in content for word in ["team", "collaborate", "group"]): |
|
|
return "collaboration" |
|
|
elif any(word in content for word in ["data", "analysis", "metrics"]): |
|
|
return "data_analysis" |
|
|
elif any(word in content for word in ["customer", "client", "user"]): |
|
|
return "customer_oriented" |
|
|
else: |
|
|
return "general" |
|
|
|
|
|
def _calculate_group_coherence(self, elements: List[ContextElement]) -> float: |
|
|
"""Calculate semantic coherence of a group.""" |
|
|
if len(elements) < 2: |
|
|
return 1.0 |
|
|
|
|
|
|
|
|
similarities = [] |
|
|
for i, elem1 in enumerate(elements): |
|
|
for j, elem2 in enumerate(elements[i+1:], i+1): |
|
|
|
|
|
overlap = len(set(elem1.metadata.keys()) & set(elem2.metadata.keys())) |
|
|
total_keys = len(set(elem1.metadata.keys()) | set(elem2.metadata.keys())) |
|
|
similarity = overlap / max(total_keys, 1) |
|
|
similarities.append(similarity) |
|
|
|
|
|
return np.mean(similarities) if similarities else 0.5 |
|
|
|
|
|
def _generate_summary(self, content: Any) -> str: |
|
|
"""Generate summary of content.""" |
|
|
content_str = str(content) |
|
|
if len(content_str) <= 100: |
|
|
return content_str |
|
|
|
|
|
|
|
|
sentences = content_str.split('.') |
|
|
if len(sentences) > 2: |
|
|
return '. '.join(sentences[:2]) + '.' |
|
|
else: |
|
|
return content_str[:97] + "..." |
|
|
|
|
|
async def _compress_semantic_group( |
|
|
self, |
|
|
elements: List[ContextElement], |
|
|
theme: str, |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Compress a semantic group of elements.""" |
|
|
|
|
|
|
|
|
key_concepts = await self._extract_key_concepts(elements) |
|
|
|
|
|
|
|
|
representative_embedding = await self._generate_representative_embedding(elements) |
|
|
|
|
|
|
|
|
representative_elements = sorted( |
|
|
elements, |
|
|
key=lambda e: e.relevance_score * e.confidence, |
|
|
reverse=True |
|
|
)[:min(5, len(elements))] |
|
|
|
|
|
return { |
|
|
"theme": theme, |
|
|
"compressed_elements": [self._element_to_summary(e) for e in representative_elements], |
|
|
"key_concepts": key_concepts, |
|
|
"representative_embedding": representative_embedding.tolist() if isinstance(representative_embedding, np.ndarray) else representative_embedding, |
|
|
"confidence": np.mean([e.confidence for e in elements]) |
|
|
} |
|
|
|
|
|
async def _extract_key_concepts(self, elements: List[ContextElement]) -> List[str]: |
|
|
"""Extract key concepts from semantic group.""" |
|
|
concepts = [] |
|
|
concept_scores = defaultdict(float) |
|
|
|
|
|
for element in elements: |
|
|
content = str(element.content).lower() |
|
|
metadata = element.metadata |
|
|
|
|
|
|
|
|
words = content.split() |
|
|
for word in words: |
|
|
if len(word) > 3: |
|
|
concept_scores[word] += element.relevance_score |
|
|
|
|
|
|
|
|
for key, value in metadata.items(): |
|
|
if isinstance(value, str): |
|
|
concept_scores[value.lower()] += element.confidence * 0.5 |
|
|
|
|
|
|
|
|
sorted_concepts = sorted(concept_scores.items(), key=lambda x: x[1], reverse=True) |
|
|
return [concept for concept, score in sorted_concepts[:10]] |
|
|
|
|
|
async def _generate_representative_embedding(self, elements: List[ContextElement]) -> np.ndarray: |
|
|
"""Generate representative embedding for semantic group.""" |
|
|
if not elements: |
|
|
return np.zeros(128) |
|
|
|
|
|
|
|
|
embeddings = [] |
|
|
for element in elements: |
|
|
|
|
|
np.random.seed(hash(element.id) % (2**32)) |
|
|
embedding = np.random.rand(128) |
|
|
embeddings.append(embedding * element.relevance_score) |
|
|
|
|
|
if embeddings: |
|
|
return np.mean(embeddings, axis=0) |
|
|
else: |
|
|
return np.zeros(128) |
|
|
|
|
|
def _element_to_summary(self, element: ContextElement) -> Dict[str, Any]: |
|
|
"""Convert element to summary format.""" |
|
|
return { |
|
|
"id": element.id, |
|
|
"content_summary": self._generate_summary(element.content), |
|
|
"relevance_score": element.relevance_score, |
|
|
"confidence": element.confidence, |
|
|
"key_metadata": {k: v for k, v in list(element.metadata.items())[:3]} |
|
|
} |
|
|
|
|
|
async def _compress_temporal_window( |
|
|
self, |
|
|
elements: List[ContextElement], |
|
|
window_name: str, |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Compress a temporal window of elements.""" |
|
|
|
|
|
|
|
|
if window_name == "immediate": |
|
|
|
|
|
compression_level = 0.9 |
|
|
elif window_name == "recent": |
|
|
|
|
|
compression_level = 0.7 |
|
|
else: |
|
|
|
|
|
compression_level = 0.4 |
|
|
|
|
|
compressed_elements = [] |
|
|
for element in elements: |
|
|
if element.relevance_score > quality_threshold: |
|
|
if compression_level > 0.8: |
|
|
compressed_elements.append(self._element_to_summary(element)) |
|
|
else: |
|
|
compressed_elements.append({ |
|
|
"id": element.id, |
|
|
"content_summary": self._generate_summary(element.content), |
|
|
"relevance_score": element.relevance_score |
|
|
}) |
|
|
|
|
|
return { |
|
|
"window_name": window_name, |
|
|
"compression_level": compression_level, |
|
|
"compressed_elements": compressed_elements, |
|
|
"time_range": { |
|
|
"start": min(e.timestamp for e in elements).isoformat(), |
|
|
"end": max(e.timestamp for e in elements).isoformat() |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compress_with_minimal_loss(self, elements: List[ContextElement], tier: str) -> List[Dict[str, Any]]: |
|
|
"""Compress with minimal information loss.""" |
|
|
return [self._element_to_summary(e) for e in elements] |
|
|
|
|
|
async def _compress_with_standard_loss(self, elements: List[ContextElement], tier: str) -> List[Dict[str, Any]]: |
|
|
"""Compress with standard information loss.""" |
|
|
compressed = [] |
|
|
for element in elements: |
|
|
compressed.append({ |
|
|
"id": element.id, |
|
|
"summary": self._generate_summary(element.content), |
|
|
"relevance_score": element.relevance_score, |
|
|
"confidence": element.confidence |
|
|
}) |
|
|
return compressed |
|
|
|
|
|
async def _compress_aggressively(self, elements: List[ContextElement], tier: str) -> List[Dict[str, Any]]: |
|
|
"""Compress aggressively with significant information loss.""" |
|
|
compressed = [] |
|
|
for element in elements: |
|
|
compressed.append({ |
|
|
"id": element.id, |
|
|
"theme": self._extract_semantic_theme(element), |
|
|
"relevance_score": element.relevance_score, |
|
|
"confidence": round(element.confidence, 2) |
|
|
}) |
|
|
return compressed |
|
|
|
|
|
async def _compress_modality_group( |
|
|
self, |
|
|
elements: List[ContextElement], |
|
|
modality: ContextModality, |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Compress elements of a specific modality.""" |
|
|
|
|
|
|
|
|
if modality == ContextModality.TEXT: |
|
|
return await self._compress_text_modality(elements, quality_threshold) |
|
|
elif modality == ContextModality.VISUAL: |
|
|
return await self._compress_visual_modality(elements, quality_threshold) |
|
|
elif modality == ContextModality.AUDITORY: |
|
|
return await self._compress_auditory_modality(elements, quality_threshold) |
|
|
else: |
|
|
return await self._compress_generic_modality(elements, quality_threshold) |
|
|
|
|
|
async def _compress_text_modality(self, elements: List[ContextElement], quality_threshold: float) -> Dict[str, Any]: |
|
|
"""Compress text modality elements.""" |
|
|
compressed_elements = [] |
|
|
for element in elements: |
|
|
if element.relevance_score > quality_threshold: |
|
|
compressed_elements.append({ |
|
|
"id": element.id, |
|
|
"content_type": "text", |
|
|
"content": self._generate_summary(element.content), |
|
|
"metadata": element.metadata |
|
|
}) |
|
|
|
|
|
return { |
|
|
"modality": "text", |
|
|
"compressed_elements": compressed_elements, |
|
|
"compression_method": "text_summarization" |
|
|
} |
|
|
|
|
|
async def _compress_visual_modality(self, elements: List[ContextElement], quality_threshold: float) -> Dict[str, Any]: |
|
|
"""Compress visual modality elements.""" |
|
|
compressed_elements = [] |
|
|
for element in elements: |
|
|
if element.relevance_score > quality_threshold: |
|
|
|
|
|
compressed_elements.append({ |
|
|
"id": element.id, |
|
|
"content_type": "visual", |
|
|
"visual_descriptor": element.metadata.get("visual_descriptor", str(element.content)[:50]), |
|
|
"metadata": {k: v for k, v in element.metadata.items() if k in ["color", "shape", "size", "location"]} |
|
|
}) |
|
|
|
|
|
return { |
|
|
"modality": "visual", |
|
|
"compressed_elements": compressed_elements, |
|
|
"compression_method": "visual_feature_extraction" |
|
|
} |
|
|
|
|
|
async def _compress_auditory_modality(self, elements: List[ContextElement], quality_threshold: float) -> Dict[str, Any]: |
|
|
"""Compress auditory modality elements.""" |
|
|
compressed_elements = [] |
|
|
for element in elements: |
|
|
if element.relevance_score > quality_threshold: |
|
|
compressed_elements.append({ |
|
|
"id": element.id, |
|
|
"content_type": "auditory", |
|
|
"audio_descriptor": element.metadata.get("audio_descriptor", str(element.content)[:50]), |
|
|
"metadata": {k: v for k, v in element.metadata.items() if k in ["frequency", "amplitude", "duration"]} |
|
|
}) |
|
|
|
|
|
return { |
|
|
"modality": "auditory", |
|
|
"compressed_elements": compressed_elements, |
|
|
"compression_method": "audio_feature_extraction" |
|
|
} |
|
|
|
|
|
async def _compress_generic_modality(self, elements: List[ContextElement], quality_threshold: float) -> Dict[str, Any]: |
|
|
"""Compress generic modality elements.""" |
|
|
compressed_elements = [] |
|
|
for element in elements: |
|
|
if element.relevance_score > quality_threshold: |
|
|
compressed_elements.append({ |
|
|
"id": element.id, |
|
|
"content_summary": self._generate_summary(element.content), |
|
|
"relevance_score": element.relevance_score, |
|
|
"confidence": element.confidence |
|
|
}) |
|
|
|
|
|
return { |
|
|
"modality": "generic", |
|
|
"compressed_elements": compressed_elements, |
|
|
"compression_method": "generic_compression" |
|
|
} |
|
|
|
|
|
async def _analyze_cross_modal_correlations(self, modality_groups: Dict[ContextModality, List[ContextElement]]) -> Dict[str, Any]: |
|
|
"""Analyze correlations between different modalities.""" |
|
|
correlations = {} |
|
|
|
|
|
modalities = list(modality_groups.keys()) |
|
|
for i, mod1 in enumerate(modalities): |
|
|
for mod2 in modalities[i+1:]: |
|
|
elements1 = modality_groups[mod1] |
|
|
elements2 = modality_groups[mod2] |
|
|
|
|
|
|
|
|
temporal_corr = self._calculate_temporal_correlation(elements1, elements2) |
|
|
|
|
|
|
|
|
relevance_corr = self._calculate_relevance_correlation(elements1, elements2) |
|
|
|
|
|
correlations[f"{mod1.value}_{mod2.value}"] = { |
|
|
"temporal_correlation": temporal_corr, |
|
|
"relevance_correlation": relevance_corr, |
|
|
"correlation_strength": (temporal_corr + relevance_corr) / 2 |
|
|
} |
|
|
|
|
|
return correlations |
|
|
|
|
|
def _calculate_temporal_correlation(self, elements1: List[ContextElement], elements2: List[ContextElement]) -> float: |
|
|
"""Calculate temporal correlation between two sets of elements.""" |
|
|
if not elements1 or not elements2: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
timestamps1 = [e.timestamp for e in elements1] |
|
|
timestamps2 = [e.timestamp for e in elements2] |
|
|
|
|
|
|
|
|
avg_diff = 0 |
|
|
count = 0 |
|
|
|
|
|
for t1 in timestamps1: |
|
|
min_diff = min(abs((t1 - t2).total_seconds()) for t2 in timestamps2) |
|
|
avg_diff += min_diff |
|
|
count += 1 |
|
|
|
|
|
if count == 0: |
|
|
return 0.0 |
|
|
|
|
|
avg_diff /= count |
|
|
|
|
|
return max(0, 1 - avg_diff / 3600) |
|
|
|
|
|
def _calculate_relevance_correlation(self, elements1: List[ContextElement], elements2: List[ContextElement]) -> float: |
|
|
"""Calculate relevance correlation between two sets of elements.""" |
|
|
if not elements1 or not elements2: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
relevance1 = [e.relevance_score for e in elements1] |
|
|
relevance2 = [e.relevance_score for e in elements2] |
|
|
|
|
|
if len(relevance1) > 1 and len(relevance2) > 1: |
|
|
|
|
|
var1 = np.var(relevance1) |
|
|
var2 = np.var(relevance2) |
|
|
|
|
|
if var1 > 0 and var2 > 0: |
|
|
|
|
|
return min(var1, var2) / max(var1, var2) |
|
|
|
|
|
return 0.5 |
|
|
|
|
|
def _calculate_semantic_coherence(self, compressed_groups: List[Dict[str, Any]]) -> float: |
|
|
"""Calculate overall semantic coherence of compressed context.""" |
|
|
if not compressed_groups: |
|
|
return 0.0 |
|
|
|
|
|
coherence_scores = [group.get("confidence", 0.5) for group in compressed_groups] |
|
|
return np.mean(coherence_scores) |
|
|
|
|
|
|
|
|
class ContextSynthesisEngine: |
|
|
"""Advanced context synthesis engine for integrating diverse data sources.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.synthesis_methods = { |
|
|
SynthesisMethod.FUSION: self._fusion_synthesis, |
|
|
SynthesisMethod.MESH: self._mesh_synthesis, |
|
|
SynthesisMethod.HIERARCHICAL: self._hierarchical_synthesis, |
|
|
SynthesisMethod.GRAPH_BASED: self._graph_based_synthesis, |
|
|
SynthesisMethod.ATTENTION_BASED: self._attention_based_synthesis, |
|
|
SynthesisMethod.ENSEMBLE: self._ensemble_synthesis |
|
|
} |
|
|
self.synthesis_cache = {} |
|
|
|
|
|
async def synthesize_contexts( |
|
|
self, |
|
|
context_groups: List[List[ContextElement]], |
|
|
synthesis_method: SynthesisMethod = SynthesisMethod.FUSION, |
|
|
quality_threshold: float = 0.6 |
|
|
) -> SynthesizedContext: |
|
|
"""Synthesize contexts from multiple sources.""" |
|
|
try: |
|
|
if synthesis_method not in self.synthesis_methods: |
|
|
synthesis_method = SynthesisMethod.FUSION |
|
|
|
|
|
synthesis_func = self.synthesis_methods[synthesis_method] |
|
|
result = await synthesis_func(context_groups, quality_threshold) |
|
|
|
|
|
|
|
|
synthesized_context = SynthesizedContext( |
|
|
id=f"synthesized_{int(time.time())}", |
|
|
source_elements=[elem.id for group in context_groups for elem in group], |
|
|
synthesis_method=synthesis_method, |
|
|
compressed_representation=result, |
|
|
quality_score=self._calculate_synthesis_quality(result), |
|
|
confidence=self._calculate_synthesis_confidence(result), |
|
|
temporal_scope=self._calculate_temporal_scope(context_groups), |
|
|
semantic_clusters=self._identify_semantic_clusters(result) |
|
|
) |
|
|
|
|
|
return synthesized_context |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Context synthesis failed: {e}") |
|
|
return SynthesizedContext( |
|
|
id=f"error_{int(time.time())}", |
|
|
source_elements=[], |
|
|
synthesis_method=synthesis_method, |
|
|
compressed_representation={"error": str(e)}, |
|
|
quality_score=0.0, |
|
|
confidence=0.0, |
|
|
temporal_scope=(datetime.utcnow(), datetime.utcnow()), |
|
|
semantic_clusters=[] |
|
|
) |
|
|
|
|
|
async def _fusion_synthesis( |
|
|
self, |
|
|
context_groups: List[List[ContextElement]], |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform fusion-based context synthesis.""" |
|
|
|
|
|
|
|
|
all_elements = [] |
|
|
for group in context_groups: |
|
|
all_elements.extend(group) |
|
|
|
|
|
|
|
|
unique_elements = self._remove_duplicates(all_elements) |
|
|
|
|
|
|
|
|
sorted_elements = sorted( |
|
|
unique_elements, |
|
|
key=lambda e: e.relevance_score * e.confidence, |
|
|
reverse=True |
|
|
) |
|
|
|
|
|
|
|
|
selected_elements = [ |
|
|
e for e in sorted_elements |
|
|
if e.relevance_score * e.confidence > quality_threshold |
|
|
] |
|
|
|
|
|
|
|
|
fused_content = [] |
|
|
for element in selected_elements[:50]: |
|
|
fused_content.append({ |
|
|
"id": element.id, |
|
|
"content": element.content, |
|
|
"source_group": self._identify_source_group(element, context_groups), |
|
|
"relevance_score": element.relevance_score, |
|
|
"confidence": element.confidence, |
|
|
"metadata": element.metadata |
|
|
}) |
|
|
|
|
|
return { |
|
|
"method": "fusion", |
|
|
"elements": fused_content, |
|
|
"fusion_score": len(fused_content) / max(len(unique_elements), 1), |
|
|
"quality_distribution": self._calculate_quality_distribution(selected_elements) |
|
|
} |
|
|
|
|
|
async def _mesh_synthesis( |
|
|
self, |
|
|
context_groups: List[List[ContextElement]], |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform mesh-based context synthesis.""" |
|
|
|
|
|
|
|
|
connections = self._create_cross_group_connections(context_groups) |
|
|
|
|
|
|
|
|
mesh_nodes = [] |
|
|
mesh_edges = [] |
|
|
|
|
|
for group_idx, group in enumerate(context_groups): |
|
|
for element in group: |
|
|
if element.relevance_score > quality_threshold: |
|
|
mesh_nodes.append({ |
|
|
"id": element.id, |
|
|
"content": element.content, |
|
|
"group": group_idx, |
|
|
"relevance_score": element.relevance_score |
|
|
}) |
|
|
|
|
|
|
|
|
for connection in connections: |
|
|
if connection["source"] in [node["id"] for node in mesh_nodes] and \ |
|
|
connection["target"] in [node["id"] for node in mesh_nodes]: |
|
|
mesh_edges.append(connection) |
|
|
|
|
|
return { |
|
|
"method": "mesh", |
|
|
"nodes": mesh_nodes, |
|
|
"edges": mesh_edges, |
|
|
"mesh_density": len(mesh_edges) / max(len(mesh_nodes) * (len(mesh_nodes) - 1) / 2, 1), |
|
|
"clustering_coefficient": self._calculate_clustering_coefficient(mesh_edges, mesh_nodes) |
|
|
} |
|
|
|
|
|
async def _hierarchical_synthesis( |
|
|
self, |
|
|
context_groups: List[List[ContextElement]], |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform hierarchical context synthesis.""" |
|
|
|
|
|
|
|
|
hierarchy = { |
|
|
"level_1": [], |
|
|
"level_2": [], |
|
|
"level_3": [] |
|
|
} |
|
|
|
|
|
|
|
|
element_presence = self._analyze_cross_group_presence(context_groups) |
|
|
|
|
|
for group_idx, group in enumerate(context_groups): |
|
|
for element in group: |
|
|
if element.relevance_score > quality_threshold: |
|
|
presence_count = element_presence.get(element.id, 0) |
|
|
|
|
|
if presence_count > 1: |
|
|
hierarchy["level_1"].append({ |
|
|
"id": element.id, |
|
|
"content": element.content, |
|
|
"groups": self._find_element_groups(element, context_groups), |
|
|
"cross_group_relevance": presence_count, |
|
|
"relevance_score": element.relevance_score |
|
|
}) |
|
|
elif element.relevance_score > 0.7: |
|
|
hierarchy["level_2"].append({ |
|
|
"id": element.id, |
|
|
"content": element.content, |
|
|
"group": group_idx, |
|
|
"relevance_score": element.relevance_score |
|
|
}) |
|
|
else: |
|
|
hierarchy["level_3"].append({ |
|
|
"id": element.id, |
|
|
"content_summary": str(element.content)[:100], |
|
|
"group": group_idx, |
|
|
"relevance_score": element.relevance_score |
|
|
}) |
|
|
|
|
|
return { |
|
|
"method": "hierarchical", |
|
|
"hierarchy": hierarchy, |
|
|
"distribution": { |
|
|
"level_1": len(hierarchy["level_1"]), |
|
|
"level_2": len(hierarchy["level_2"]), |
|
|
"level_3": len(hierarchy["level_3"]) |
|
|
} |
|
|
} |
|
|
|
|
|
async def _graph_based_synthesis( |
|
|
self, |
|
|
context_groups: List[List[ContextElement]], |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform graph-based context synthesis.""" |
|
|
|
|
|
|
|
|
nodes = [] |
|
|
edges = [] |
|
|
|
|
|
|
|
|
for group_idx, group in enumerate(context_groups): |
|
|
for element in group: |
|
|
if element.relevance_score > quality_threshold: |
|
|
nodes.append({ |
|
|
"id": element.id, |
|
|
"content": element.content, |
|
|
"group": group_idx, |
|
|
"relevance_score": element.relevance_score, |
|
|
"metadata": element.metadata |
|
|
}) |
|
|
|
|
|
|
|
|
for i, node1 in enumerate(nodes): |
|
|
for j, node2 in enumerate(nodes[i+1:], i+1): |
|
|
relationship_strength = self._calculate_relationship_strength(node1, node2) |
|
|
if relationship_strength > 0.3: |
|
|
edges.append({ |
|
|
"source": node1["id"], |
|
|
"target": node2["id"], |
|
|
"strength": relationship_strength, |
|
|
"relationship_type": "semantic" |
|
|
}) |
|
|
|
|
|
|
|
|
graph_metrics = self._calculate_graph_metrics(nodes, edges) |
|
|
|
|
|
return { |
|
|
"method": "graph_based", |
|
|
"nodes": nodes, |
|
|
"edges": edges, |
|
|
"graph_metrics": graph_metrics, |
|
|
"community_structure": self._detect_communities(nodes, edges) |
|
|
} |
|
|
|
|
|
async def _attention_based_synthesis( |
|
|
self, |
|
|
context_groups: List[List[ContextElement]], |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform attention-based context synthesis.""" |
|
|
|
|
|
|
|
|
attention_weights = {} |
|
|
total_elements = 0 |
|
|
|
|
|
for group_idx, group in enumerate(context_groups): |
|
|
for element in group: |
|
|
if element.relevance_score > quality_threshold: |
|
|
|
|
|
relevance_weight = element.relevance_score |
|
|
confidence_weight = element.confidence |
|
|
recency_weight = self._calculate_recency_weight(element.timestamp) |
|
|
diversity_weight = self._calculate_diversity_weight(element, context_groups) |
|
|
|
|
|
attention_score = (relevance_weight * 0.4 + |
|
|
confidence_weight * 0.3 + |
|
|
recency_weight * 0.2 + |
|
|
diversity_weight * 0.1) |
|
|
|
|
|
attention_weights[element.id] = attention_score |
|
|
total_elements += 1 |
|
|
|
|
|
|
|
|
if attention_weights: |
|
|
max_weight = max(attention_weights.values()) |
|
|
attention_weights = {k: v / max_weight for k, v in attention_weights.items()} |
|
|
|
|
|
|
|
|
attended_elements = [] |
|
|
for group_idx, group in enumerate(context_groups): |
|
|
for element in group: |
|
|
if element.id in attention_weights and attention_weights[element.id] > 0.1: |
|
|
attended_elements.append({ |
|
|
"id": element.id, |
|
|
"content": element.content, |
|
|
"group": group_idx, |
|
|
"attention_weight": attention_weights[element.id], |
|
|
"relevance_score": element.relevance_score |
|
|
}) |
|
|
|
|
|
return { |
|
|
"method": "attention_based", |
|
|
"attended_elements": attended_elements, |
|
|
"attention_distribution": self._calculate_attention_distribution(attention_weights), |
|
|
"attention_entropy": self._calculate_attention_entropy(attention_weights) |
|
|
} |
|
|
|
|
|
async def _ensemble_synthesis( |
|
|
self, |
|
|
context_groups: List[List[ContextElement]], |
|
|
quality_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Perform ensemble-based context synthesis combining multiple methods.""" |
|
|
|
|
|
|
|
|
synthesis_results = {} |
|
|
|
|
|
for method in [SynthesisMethod.FUSION, SynthesisMethod.HIERARCHICAL, SynthesisMethod.ATTENTION_BASED]: |
|
|
try: |
|
|
result = await self.synthesis_methods[method](context_groups, quality_threshold) |
|
|
synthesis_results[method.value] = result |
|
|
except Exception as e: |
|
|
logger.warning(f"Ensemble synthesis method {method.value} failed: {e}") |
|
|
|
|
|
|
|
|
ensemble_result = self._combine_synthesis_results(synthesis_results) |
|
|
|
|
|
return { |
|
|
"method": "ensemble", |
|
|
"individual_results": synthesis_results, |
|
|
"ensemble_result": ensemble_result, |
|
|
"method_agreement": self._calculate_method_agreement(synthesis_results) |
|
|
} |
|
|
|
|
|
def _remove_duplicates(self, elements: List[ContextElement]) -> List[ContextElement]: |
|
|
"""Remove duplicate elements based on content similarity.""" |
|
|
unique_elements = [] |
|
|
seen_content_hashes = set() |
|
|
|
|
|
for element in elements: |
|
|
content_hash = hashlib.md5(str(element.content).encode()).hexdigest() |
|
|
if content_hash not in seen_content_hashes: |
|
|
seen_content_hashes.add(content_hash) |
|
|
unique_elements.append(element) |
|
|
|
|
|
return unique_elements |
|
|
|
|
|
def _identify_source_group(self, element: ContextElement, context_groups: List[List[ContextElement]]) -> int: |
|
|
"""Identify which group an element belongs to.""" |
|
|
for group_idx, group in enumerate(context_groups): |
|
|
if element in group: |
|
|
return group_idx |
|
|
return -1 |
|
|
|
|
|
def _find_element_groups(self, element: ContextElement, context_groups: List[List[ContextElement]]) -> List[int]: |
|
|
"""Find all groups an element appears in.""" |
|
|
groups = [] |
|
|
for group_idx, group in enumerate(context_groups): |
|
|
if element in group: |
|
|
groups.append(group_idx) |
|
|
return groups |
|
|
|
|
|
def _calculate_quality_distribution(self, elements: List[ContextElement]) -> Dict[str, float]: |
|
|
"""Calculate quality score distribution.""" |
|
|
if not elements: |
|
|
return {} |
|
|
|
|
|
relevance_scores = [e.relevance_score for e in elements] |
|
|
confidence_scores = [e.confidence for e in elements] |
|
|
|
|
|
return { |
|
|
"mean_relevance": np.mean(relevance_scores), |
|
|
"std_relevance": np.std(relevance_scores), |
|
|
"mean_confidence": np.mean(confidence_scores), |
|
|
"std_confidence": np.std(confidence_scores) |
|
|
} |
|
|
|
|
|
def _create_cross_group_connections(self, context_groups: List[List[ContextElement]]) -> List[Dict[str, Any]]: |
|
|
"""Create connections between elements from different groups.""" |
|
|
connections = [] |
|
|
|
|
|
for i, group1 in enumerate(context_groups): |
|
|
for j, group2 in enumerate(context_groups[i+1:], i+1): |
|
|
for elem1 in group1: |
|
|
for elem2 in group2: |
|
|
similarity = self._calculate_element_similarity(elem1, elem2) |
|
|
if similarity > 0.5: |
|
|
connections.append({ |
|
|
"source": elem1.id, |
|
|
"target": elem2.id, |
|
|
"strength": similarity, |
|
|
"groups": (i, j) |
|
|
}) |
|
|
|
|
|
return connections |
|
|
|
|
|
def _calculate_element_similarity(self, elem1: ContextElement, elem2: ContextElement) -> float: |
|
|
"""Calculate similarity between two elements.""" |
|
|
|
|
|
common_metadata = set(elem1.metadata.keys()) & set(elem2.metadata.keys()) |
|
|
total_metadata = set(elem1.metadata.keys()) | set(elem2.metadata.keys()) |
|
|
|
|
|
if not total_metadata: |
|
|
return 0.0 |
|
|
|
|
|
metadata_similarity = len(common_metadata) / len(total_metadata) |
|
|
|
|
|
|
|
|
content1 = str(elem1.content).lower() |
|
|
content2 = str(elem2.content).lower() |
|
|
|
|
|
content_words1 = set(content1.split()) |
|
|
content_words2 = set(content2.split()) |
|
|
|
|
|
if not content_words1 and not content_words2: |
|
|
content_similarity = 1.0 |
|
|
elif not content_words1 or not content_words2: |
|
|
content_similarity = 0.0 |
|
|
else: |
|
|
content_similarity = len(content_words1 & content_words2) / len(content_words1 | content_words2) |
|
|
|
|
|
return (metadata_similarity + content_similarity) / 2 |
|
|
|
|
|
def _calculate_clustering_coefficient(self, edges: List[Dict[str, Any]], nodes: List[Dict[str, Any]]) -> float: |
|
|
"""Calculate clustering coefficient of the mesh.""" |
|
|
if len(nodes) < 3: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
adjacency = defaultdict(set) |
|
|
for edge in edges: |
|
|
adjacency[edge["source"]].add(edge["target"]) |
|
|
adjacency[edge["target"]].add(edge["source"]) |
|
|
|
|
|
|
|
|
clustering_coeffs = [] |
|
|
for node in nodes: |
|
|
neighbors = adjacency[node["id"]] |
|
|
if len(neighbors) < 2: |
|
|
continue |
|
|
|
|
|
possible_edges = len(neighbors) * (len(neighbors) - 1) / 2 |
|
|
actual_edges = 0 |
|
|
|
|
|
for neighbor1 in neighbors: |
|
|
for neighbor2 in neighbors: |
|
|
if neighbor1 != neighbor2 and neighbor2 in adjacency[neighbor1]: |
|
|
actual_edges += 1 |
|
|
|
|
|
if possible_edges > 0: |
|
|
clustering_coeffs.append(actual_edges / possible_edges) |
|
|
|
|
|
return np.mean(clustering_coeffs) if clustering_coeffs else 0.0 |
|
|
|
|
|
def _analyze_cross_group_presence(self, context_groups: List[List[ContextElement]]) -> Dict[str, int]: |
|
|
"""Analyze which elements appear in multiple groups.""" |
|
|
element_presence = defaultdict(int) |
|
|
|
|
|
for group in context_groups: |
|
|
group_elements = set(elem.id for elem in group) |
|
|
for elem_id in group_elements: |
|
|
element_presence[elem_id] += 1 |
|
|
|
|
|
return dict(element_presence) |
|
|
|
|
|
def _calculate_relationship_strength(self, node1: Dict[str, Any], node2: Dict[str, Any]) -> float: |
|
|
"""Calculate relationship strength between two nodes.""" |
|
|
|
|
|
relevance_factor = min(node1["relevance_score"], node2["relevance_score"]) |
|
|
|
|
|
|
|
|
metadata1 = node1.get("metadata", {}) |
|
|
metadata2 = node2.get("metadata", {}) |
|
|
|
|
|
if not metadata1 and not metadata2: |
|
|
metadata_similarity = 1.0 |
|
|
elif not metadata1 or not metadata2: |
|
|
metadata_similarity = 0.0 |
|
|
else: |
|
|
common_keys = set(metadata1.keys()) & set(metadata2.keys()) |
|
|
total_keys = set(metadata1.keys()) | set(metadata2.keys()) |
|
|
metadata_similarity = len(common_keys) / max(len(total_keys), 1) |
|
|
|
|
|
return (relevance_factor + metadata_similarity) / 2 |
|
|
|
|
|
def _calculate_graph_metrics(self, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> Dict[str, float]: |
|
|
"""Calculate metrics of the knowledge graph.""" |
|
|
if not nodes: |
|
|
return {} |
|
|
|
|
|
num_nodes = len(nodes) |
|
|
num_edges = len(edges) |
|
|
|
|
|
|
|
|
density = num_edges / max(num_nodes * (num_nodes - 1) / 2, 1) |
|
|
|
|
|
|
|
|
degrees = defaultdict(int) |
|
|
for edge in edges: |
|
|
degrees[edge["source"]] += 1 |
|
|
degrees[edge["target"]] += 1 |
|
|
|
|
|
avg_degree = np.mean(list(degrees.values())) if degrees else 0 |
|
|
|
|
|
return { |
|
|
"nodes": num_nodes, |
|
|
"edges": num_edges, |
|
|
"density": density, |
|
|
"average_degree": avg_degree, |
|
|
"connected_components": self._count_connected_components(nodes, edges) |
|
|
} |
|
|
|
|
|
def _count_connected_components(self, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> int: |
|
|
"""Count connected components in the graph.""" |
|
|
if not nodes: |
|
|
return 0 |
|
|
|
|
|
|
|
|
adjacency = defaultdict(set) |
|
|
for edge in edges: |
|
|
adjacency[edge["source"]].add(edge["target"]) |
|
|
adjacency[edge["target"]].add(edge["source"]) |
|
|
|
|
|
|
|
|
visited = set() |
|
|
components = 0 |
|
|
|
|
|
for node in nodes: |
|
|
if node["id"] not in visited: |
|
|
self._dfs(node["id"], adjacency, visited) |
|
|
components += 1 |
|
|
|
|
|
return components |
|
|
|
|
|
def _dfs(self, node: str, adjacency: Dict[str, set], visited: set): |
|
|
"""Depth-first search for connected components.""" |
|
|
visited.add(node) |
|
|
for neighbor in adjacency[node]: |
|
|
if neighbor not in visited: |
|
|
self._dfs(neighbor, adjacency, visited) |
|
|
|
|
|
def _detect_communities(self, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> List[List[str]]: |
|
|
"""Detect communities in the graph (simplified algorithm).""" |
|
|
|
|
|
if not nodes or not edges: |
|
|
return [[node["id"]] for node in nodes] |
|
|
|
|
|
|
|
|
communities = [] |
|
|
remaining_nodes = [node["id"] for node in nodes] |
|
|
|
|
|
while remaining_nodes: |
|
|
|
|
|
community = [remaining_nodes.pop(0)] |
|
|
|
|
|
|
|
|
to_add = True |
|
|
while to_add: |
|
|
to_add = False |
|
|
for node_id in remaining_nodes[:]: |
|
|
|
|
|
connections = sum(1 for edge in edges |
|
|
if (edge["source"] == node_id and edge["target"] in community) or |
|
|
(edge["target"] == node_id and edge["source"] in community)) |
|
|
|
|
|
if connections >= len(community) * 0.3: |
|
|
community.append(node_id) |
|
|
remaining_nodes.remove(node_id) |
|
|
to_add = True |
|
|
break |
|
|
|
|
|
communities.append(community) |
|
|
|
|
|
return communities |
|
|
|
|
|
def _calculate_recency_weight(self, timestamp: datetime) -> float: |
|
|
"""Calculate recency weight based on timestamp.""" |
|
|
age_seconds = (datetime.utcnow() - timestamp).total_seconds() |
|
|
return max(0, 1 - age_seconds / 86400) |
|
|
|
|
|
def _calculate_diversity_weight(self, element: ContextElement, context_groups: List[List[ContextElement]]) -> float: |
|
|
"""Calculate diversity weight based on element uniqueness.""" |
|
|
|
|
|
content = str(element.content).lower() |
|
|
word_count = len(content.split()) |
|
|
|
|
|
|
|
|
diversity_score = min(1.0, word_count / 20.0) |
|
|
|
|
|
return diversity_score |
|
|
|
|
|
def _calculate_attention_distribution(self, attention_weights: Dict[str, float]) -> Dict[str, float]: |
|
|
"""Calculate attention weight distribution.""" |
|
|
if not attention_weights: |
|
|
return {} |
|
|
|
|
|
weights = list(attention_weights.values()) |
|
|
|
|
|
return { |
|
|
"mean": np.mean(weights), |
|
|
"std": np.std(weights), |
|
|
"min": np.min(weights), |
|
|
"max": np.max(weights), |
|
|
"top_10_percent": np.percentile(weights, 90) |
|
|
} |
|
|
|
|
|
def _calculate_attention_entropy(self, attention_weights: Dict[str, float]) -> float: |
|
|
"""Calculate entropy of attention distribution.""" |
|
|
if not attention_weights: |
|
|
return 0.0 |
|
|
|
|
|
weights = list(attention_weights.values()) |
|
|
total = sum(weights) |
|
|
|
|
|
if total == 0: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
probs = [w / total for w in weights] |
|
|
|
|
|
|
|
|
entropy = -sum(p * np.log2(p) for p in probs if p > 0) |
|
|
|
|
|
return entropy |
|
|
|
|
|
def _combine_synthesis_results(self, synthesis_results: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Combine results from multiple synthesis methods.""" |
|
|
combined_elements = {} |
|
|
|
|
|
|
|
|
for method_name, result in synthesis_results.items(): |
|
|
if "elements" in result: |
|
|
for element in result["elements"]: |
|
|
element_id = element["id"] |
|
|
if element_id not in combined_elements: |
|
|
combined_elements[element_id] = { |
|
|
"id": element_id, |
|
|
"content": element["content"], |
|
|
"methods": [method_name], |
|
|
"confidence_scores": [element.get("confidence", element.get("relevance_score", 0.5))] |
|
|
} |
|
|
else: |
|
|
combined_elements[element_id]["methods"].append(method_name) |
|
|
if "confidence" in element: |
|
|
combined_elements[element_id]["confidence_scores"].append(element["confidence"]) |
|
|
|
|
|
|
|
|
for element_id, element_data in combined_elements.items(): |
|
|
scores = element_data["confidence_scores"] |
|
|
element_data["combined_confidence"] = np.mean(scores) if scores else 0.5 |
|
|
element_data["method_agreement"] = len(element_data["methods"]) |
|
|
|
|
|
return { |
|
|
"combined_elements": list(combined_elements.values()), |
|
|
"total_unique_elements": len(combined_elements), |
|
|
"average_method_agreement": np.mean([elem["method_agreement"] for elem in combined_elements.values()]) |
|
|
} |
|
|
|
|
|
def _calculate_method_agreement(self, synthesis_results: Dict[str, Any]) -> float: |
|
|
"""Calculate agreement between synthesis methods.""" |
|
|
if len(synthesis_results) < 2: |
|
|
return 1.0 |
|
|
|
|
|
element_sets = [] |
|
|
for method_name, result in synthesis_results.items(): |
|
|
if "elements" in result: |
|
|
element_ids = set(elem["id"] for elem in result["elements"]) |
|
|
element_sets.append(element_ids) |
|
|
|
|
|
if not element_sets: |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
similarities = [] |
|
|
for i in range(len(element_sets)): |
|
|
for j in range(i+1, len(element_sets)): |
|
|
intersection = len(element_sets[i] & element_sets[j]) |
|
|
union = len(element_sets[i] | element_sets[j]) |
|
|
similarity = intersection / max(union, 1) |
|
|
similarities.append(similarity) |
|
|
|
|
|
return np.mean(similarities) if similarities else 0.0 |
|
|
|
|
|
def _calculate_synthesis_quality(self, result: Dict[str, Any]) -> float: |
|
|
"""Calculate overall quality of synthesis result.""" |
|
|
if "elements" in result: |
|
|
elements = result["elements"] |
|
|
if not elements: |
|
|
return 0.0 |
|
|
|
|
|
avg_relevance = np.mean([elem.get("relevance_score", 0.5) for elem in elements]) |
|
|
avg_confidence = np.mean([elem.get("confidence", 0.5) for elem in elements]) |
|
|
|
|
|
return (avg_relevance + avg_confidence) / 2 |
|
|
|
|
|
return 0.5 |
|
|
|
|
|
def _calculate_synthesis_confidence(self, result: Dict[str, Any]) -> float: |
|
|
"""Calculate confidence in synthesis result.""" |
|
|
|
|
|
method_confidence = { |
|
|
"fusion": 0.8, |
|
|
"mesh": 0.7, |
|
|
"hierarchical": 0.9, |
|
|
"graph_based": 0.6, |
|
|
"attention_based": 0.8, |
|
|
"ensemble": 0.95 |
|
|
} |
|
|
|
|
|
base_confidence = method_confidence.get(result.get("method", ""), 0.7) |
|
|
|
|
|
|
|
|
if "elements" in result: |
|
|
coverage_bonus = min(0.2, len(result["elements"]) / 100) |
|
|
return min(1.0, base_confidence + coverage_bonus) |
|
|
|
|
|
return base_confidence |
|
|
|
|
|
def _calculate_temporal_scope(self, context_groups: List[List[ContextElement]]) -> Tuple[datetime, datetime]: |
|
|
"""Calculate temporal scope of synthesized context.""" |
|
|
all_timestamps = [] |
|
|
for group in context_groups: |
|
|
for element in group: |
|
|
all_timestamps.append(element.timestamp) |
|
|
|
|
|
if all_timestamps: |
|
|
return (min(all_timestamps), max(all_timestamps)) |
|
|
else: |
|
|
current_time = datetime.utcnow() |
|
|
return (current_time, current_time) |
|
|
|
|
|
def _identify_semantic_clusters(self, result: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
"""Identify semantic clusters in the synthesis result.""" |
|
|
clusters = [] |
|
|
|
|
|
if "elements" in result: |
|
|
elements = result["elements"] |
|
|
|
|
|
|
|
|
if len(elements) > 1: |
|
|
|
|
|
themes = defaultdict(list) |
|
|
for element in elements: |
|
|
|
|
|
content = str(element.get("content", "")).lower() |
|
|
if "urgent" in content or "asap" in content: |
|
|
themes["urgency"].append(element) |
|
|
elif "team" in content or "collaborate" in content: |
|
|
themes["collaboration"].append(element) |
|
|
elif "data" in content or "analysis" in content: |
|
|
themes["data_analysis"].append(element) |
|
|
else: |
|
|
themes["general"].append(element) |
|
|
|
|
|
for theme, theme_elements in themes.items(): |
|
|
if len(theme_elements) > 1: |
|
|
clusters.append({ |
|
|
"theme": theme, |
|
|
"elements": [elem["id"] for elem in theme_elements], |
|
|
"size": len(theme_elements), |
|
|
"coherence": len(theme_elements) / len(elements) |
|
|
}) |
|
|
|
|
|
return clusters |
|
|
|
|
|
|
|
|
|
|
|
class ContextCompressionAndSynthesis: |
|
|
"""Integrated context compression and synthesis system.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.compression_engine = ContextCompressionEngine() |
|
|
self.synthesis_engine = ContextSynthesisEngine() |
|
|
|
|
|
async def process_context_pipeline( |
|
|
self, |
|
|
input_context: List[ContextElement], |
|
|
compression_strategy: CompressionStrategy = CompressionStrategy.ADAPTIVE, |
|
|
synthesis_method: SynthesisMethod = SynthesisMethod.FUSION, |
|
|
quality_threshold: float = 0.6 |
|
|
) -> Dict[str, Any]: |
|
|
"""Process context through compression and synthesis pipeline.""" |
|
|
|
|
|
|
|
|
compression_result = await self.compression_engine.compress_context( |
|
|
input_context, compression_strategy, quality_threshold |
|
|
) |
|
|
|
|
|
|
|
|
compressed_elements = self._extract_compressed_elements(compression_result) |
|
|
|
|
|
|
|
|
|
|
|
synthesis_result = await self.synthesis_engine.synthesize_contexts( |
|
|
[compressed_elements], synthesis_method, quality_threshold |
|
|
) |
|
|
|
|
|
return { |
|
|
"compression": compression_result, |
|
|
"synthesis": { |
|
|
"id": synthesis_result.id, |
|
|
"method": synthesis_result.synthesis_method.value, |
|
|
"quality_score": synthesis_result.quality_score, |
|
|
"confidence": synthesis_result.confidence, |
|
|
"temporal_scope": { |
|
|
"start": synthesis_result.temporal_scope[0].isoformat(), |
|
|
"end": synthesis_result.temporal_scope[1].isoformat() |
|
|
}, |
|
|
"semantic_clusters": synthesis_result.semantic_clusters |
|
|
}, |
|
|
"pipeline_metrics": self._calculate_pipeline_metrics(compression_result, synthesis_result) |
|
|
} |
|
|
|
|
|
def _extract_compressed_elements(self, compression_result: Dict[str, Any]) -> List[ContextElement]: |
|
|
"""Extract context elements from compression result.""" |
|
|
elements = [] |
|
|
|
|
|
if compression_result.get("strategy") == "hierarchical": |
|
|
for level_name, level_data in compression_result.get("levels", {}).items(): |
|
|
for compressed_elem in level_data: |
|
|
if isinstance(compressed_elem, dict): |
|
|
|
|
|
elements.append(type('ContextElement', (), { |
|
|
'id': compressed_elem.get("id", f"compressed_{level_name}_{len(elements)}"), |
|
|
'content': compressed_elem.get("content", compressed_elem.get("summary", "")), |
|
|
'relevance_score': compressed_elem.get("relevance_score", 0.5), |
|
|
'confidence': compressed_elem.get("confidence", 0.5), |
|
|
'timestamp': datetime.utcnow(), |
|
|
'metadata': compressed_elem.get("metadata", {}) |
|
|
})()) |
|
|
|
|
|
return elements |
|
|
|
|
|
def _calculate_pipeline_metrics( |
|
|
self, |
|
|
compression_result: Dict[str, Any], |
|
|
synthesis_result: SynthesizedContext |
|
|
) -> Dict[str, Any]: |
|
|
"""Calculate metrics for the compression-synthesis pipeline.""" |
|
|
|
|
|
return { |
|
|
"compression_ratio": compression_result.get("compression_ratio", 0), |
|
|
"synthesis_quality": synthesis_result.quality_score, |
|
|
"synthesis_confidence": synthesis_result.confidence, |
|
|
"semantic_clusters": len(synthesis_result.semantic_clusters), |
|
|
"temporal_span_hours": (synthesis_result.temporal_scope[1] - synthesis_result.temporal_scope[0]).total_seconds() / 3600, |
|
|
"processing_efficiency": synthesis_result.quality_score / max(compression_result.get("compression_ratio", 1), 0.1) |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("Context Compression and Synthesis System Initialized") |
|
|
print("=" * 60) |
|
|
system = ContextCompressionAndSynthesis() |
|
|
print("Ready for advanced context compression and multi-source synthesis!") |