|
|
""" |
|
|
Context Management with Dynamic Sizing System |
|
|
========================================== |
|
|
|
|
|
Advanced context management system with dynamic window sizing, relevance scoring, |
|
|
context expiry and refresh protocols, and conflict resolution strategies. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Any, Optional, Set, Tuple, Union, Callable |
|
|
from dataclasses import dataclass, field, asdict |
|
|
from enum import Enum |
|
|
import numpy as np |
|
|
from collections import defaultdict, deque |
|
|
import heapq |
|
|
from functools import lru_cache |
|
|
import threading |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
from ai_agent_framework.core.context_engineering_agent import ( |
|
|
ContextElement, ContextModality, ContextDimension |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class ContextPriority(Enum): |
|
|
"""Context priority levels.""" |
|
|
CRITICAL = "critical" |
|
|
HIGH = "high" |
|
|
MEDIUM = "medium" |
|
|
LOW = "low" |
|
|
ARCHIVED = "archived" |
|
|
|
|
|
|
|
|
class SizingStrategy(Enum): |
|
|
"""Dynamic sizing strategies.""" |
|
|
FIXED = "fixed" |
|
|
ADAPTIVE = "adaptive" |
|
|
PREDICTIVE = "predictive" |
|
|
OPTIMIZED = "optimized" |
|
|
COMPRESSIVE = "compressive" |
|
|
|
|
|
|
|
|
class RefreshTrigger(Enum): |
|
|
"""Context refresh triggers.""" |
|
|
TIME_BASED = "time_based" |
|
|
RELEVANCE_BASED = "relevance_based" |
|
|
INTERACTION_BASED = "interaction_based" |
|
|
QUALITY_BASED = "quality_based" |
|
|
CAPACITY_BASED = "capacity_based" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ContextItem: |
|
|
"""Individual context item with metadata.""" |
|
|
id: str |
|
|
content: Any |
|
|
modality: ContextModality |
|
|
dimension: ContextDimension |
|
|
priority: ContextPriority |
|
|
timestamp: datetime |
|
|
expiry_time: Optional[datetime] |
|
|
relevance_score: float |
|
|
quality_score: float |
|
|
access_count: int |
|
|
last_accessed: datetime |
|
|
dependencies: Set[str] |
|
|
metadata: Dict[str, Any] |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.id: |
|
|
self.id = f"context_{int(time.time())}_{hash(str(self.content))}" |
|
|
if not self.timestamp: |
|
|
self.timestamp = datetime.utcnow() |
|
|
if not self.last_accessed: |
|
|
self.last_accessed = self.timestamp |
|
|
if not self.metadata: |
|
|
self.metadata = {} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ContextWindow: |
|
|
"""Context window with dynamic sizing capabilities.""" |
|
|
window_id: str |
|
|
size_limit: int |
|
|
current_size: int |
|
|
strategy: SizingStrategy |
|
|
items: List[ContextItem] |
|
|
metrics: Dict[str, float] |
|
|
created_at: datetime |
|
|
last_resized: datetime |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.window_id: |
|
|
self.window_id = f"window_{int(time.time())}" |
|
|
if not self.created_at: |
|
|
self.created_at = datetime.utcnow() |
|
|
if not self.last_resized: |
|
|
self.last_resized = self.created_at |
|
|
if not self.metrics: |
|
|
self.metrics = {} |
|
|
if not self.items: |
|
|
self.items = [] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ContextConflict: |
|
|
"""Represents a conflict between context items.""" |
|
|
conflict_id: str |
|
|
conflicting_items: List[str] |
|
|
conflict_type: str |
|
|
resolution_strategy: str |
|
|
confidence: float |
|
|
created_at: datetime |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.conflict_id: |
|
|
self.conflict_id = f"conflict_{int(time.time())}" |
|
|
if not self.created_at: |
|
|
self.created_at = datetime.utcnow() |
|
|
|
|
|
|
|
|
class ContextManager: |
|
|
"""Core context management engine with dynamic sizing.""" |
|
|
|
|
|
def __init__(self, max_context_windows: int = 10): |
|
|
self.max_context_windows = max_context_windows |
|
|
self.context_windows = {} |
|
|
self.context_index = {} |
|
|
self.refresh_scheduler = {} |
|
|
self.conflict_resolver = ContextConflictResolver() |
|
|
self.sizing_algorithms = { |
|
|
SizingStrategy.FIXED: self._fixed_sizing, |
|
|
SizingStrategy.ADAPTIVE: self._adaptive_sizing, |
|
|
SizingStrategy.PREDICTIVE: self._predictive_sizing, |
|
|
SizingStrategy.OPTIMIZED: self._optimized_sizing, |
|
|
SizingStrategy.COMPRESSIVE: self._compressive_sizing |
|
|
} |
|
|
|
|
|
self.refresh_handlers = { |
|
|
RefreshTrigger.TIME_BASED: self._time_based_refresh, |
|
|
RefreshTrigger.RELEVANCE_BASED: self._relevance_based_refresh, |
|
|
RefreshTrigger.INTERACTION_BASED: self._interaction_based_refresh, |
|
|
RefreshTrigger.QUALITY_BASED: self._quality_based_refresh, |
|
|
RefreshTrigger.CAPACITY_BASED: self._capacity_based_refresh |
|
|
} |
|
|
|
|
|
|
|
|
self.metrics = { |
|
|
"total_windows": 0, |
|
|
"total_items": 0, |
|
|
"average_window_utilization": 0.0, |
|
|
"refresh_frequency": 0.0, |
|
|
"conflict_resolution_rate": 0.0, |
|
|
"relevance_retention": 0.0 |
|
|
} |
|
|
|
|
|
|
|
|
self._lock = threading.RLock() |
|
|
|
|
|
async def create_context_window( |
|
|
self, |
|
|
window_id: Optional[str] = None, |
|
|
size_limit: int = 100, |
|
|
strategy: SizingStrategy = SizingStrategy.ADAPTIVE |
|
|
) -> ContextWindow: |
|
|
"""Create a new context window with specified strategy.""" |
|
|
|
|
|
with self._lock: |
|
|
if len(self.context_windows) >= self.max_context_windows: |
|
|
|
|
|
oldest_window_id = min( |
|
|
self.context_windows.keys(), |
|
|
key=lambda w_id: self.context_windows[w_id].created_at |
|
|
) |
|
|
await self._remove_context_window(oldest_window_id) |
|
|
|
|
|
window = ContextWindow( |
|
|
window_id=window_id, |
|
|
size_limit=size_limit, |
|
|
current_size=0, |
|
|
strategy=strategy, |
|
|
items=[], |
|
|
metrics={}, |
|
|
created_at=datetime.utcnow(), |
|
|
last_resized=datetime.utcnow() |
|
|
) |
|
|
|
|
|
self.context_windows[window.window_id] = window |
|
|
self.metrics["total_windows"] = len(self.context_windows) |
|
|
|
|
|
logger.info(f"Created context window {window.window_id} with strategy {strategy.value}") |
|
|
return window |
|
|
|
|
|
async def add_context_item( |
|
|
self, |
|
|
window_id: str, |
|
|
item: ContextItem, |
|
|
refresh_trigger: Optional[RefreshTrigger] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Add context item to window with dynamic sizing.""" |
|
|
|
|
|
if window_id not in self.context_windows: |
|
|
raise ValueError(f"Window {window_id} does not exist") |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
|
|
|
conflicts = await self._detect_conflicts(window, item) |
|
|
|
|
|
|
|
|
if conflicts: |
|
|
resolved = await self._resolve_conflicts(window, conflicts, item) |
|
|
if not resolved: |
|
|
logger.warning(f"Failed to resolve conflicts for item {item.id}") |
|
|
return {"status": "rejected", "reason": "unresolved_conflicts"} |
|
|
|
|
|
|
|
|
if item.id not in self.context_index: |
|
|
self.context_index[item.id] = [] |
|
|
self.context_index[item.id].append(window_id) |
|
|
|
|
|
|
|
|
if window.current_size < window.size_limit: |
|
|
|
|
|
window.items.append(item) |
|
|
window.current_size += 1 |
|
|
result = {"status": "added_directly"} |
|
|
else: |
|
|
|
|
|
sizing_func = self.sizing_strategies[window.strategy] |
|
|
result = await sizing_func(window, item) |
|
|
|
|
|
|
|
|
if refresh_trigger: |
|
|
await self._schedule_refresh(window_id, refresh_trigger) |
|
|
|
|
|
|
|
|
await self._update_window_metrics(window) |
|
|
|
|
|
return { |
|
|
"status": result["status"], |
|
|
"item_id": item.id, |
|
|
"window_id": window_id, |
|
|
"conflicts_resolved": len(conflicts) > 0, |
|
|
"new_size": window.current_size |
|
|
} |
|
|
|
|
|
async def get_context_items( |
|
|
self, |
|
|
window_id: str, |
|
|
limit: Optional[int] = None, |
|
|
include_metadata: bool = True |
|
|
) -> Dict[str, Any]: |
|
|
"""Retrieve context items from window with optimization.""" |
|
|
|
|
|
if window_id not in self.context_windows: |
|
|
raise ValueError(f"Window {window_id} does not exist") |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
|
|
|
sorted_items = await self._sort_context_items(window.items) |
|
|
|
|
|
|
|
|
if limit: |
|
|
sorted_items = sorted_items[:limit] |
|
|
|
|
|
|
|
|
items_data = [] |
|
|
for item in sorted_items: |
|
|
item_data = { |
|
|
"id": item.id, |
|
|
"content": item.content, |
|
|
"modality": item.modality.value, |
|
|
"dimension": item.dimension.value, |
|
|
"priority": item.priority.value, |
|
|
"relevance_score": item.relevance_score, |
|
|
"quality_score": item.quality_score, |
|
|
"timestamp": item.timestamp.isoformat() |
|
|
} |
|
|
|
|
|
if include_metadata: |
|
|
item_data.update({ |
|
|
"access_count": item.access_count, |
|
|
"dependencies": list(item.dependencies), |
|
|
"metadata": item.metadata |
|
|
}) |
|
|
|
|
|
items_data.append(item_data) |
|
|
|
|
|
return { |
|
|
"window_id": window_id, |
|
|
"items": items_data, |
|
|
"total_items": len(window.items), |
|
|
"window_utilization": window.current_size / window.size_limit, |
|
|
"metrics": window.metrics |
|
|
} |
|
|
|
|
|
async def refresh_context_window( |
|
|
self, |
|
|
window_id: str, |
|
|
force_refresh: bool = False |
|
|
) -> Dict[str, Any]: |
|
|
"""Refresh context window based on configured strategy.""" |
|
|
|
|
|
if window_id not in self.context_windows: |
|
|
raise ValueError(f"Window {window_id} does not exist") |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
|
|
|
if not force_refresh: |
|
|
refresh_needed = await self._should_refresh(window) |
|
|
if not refresh_needed["needed"]: |
|
|
return { |
|
|
"status": "skipped", |
|
|
"reason": refresh_needed["reason"] |
|
|
} |
|
|
|
|
|
|
|
|
refresh_strategy = await self._determine_refresh_strategy(window) |
|
|
|
|
|
|
|
|
refresh_func = self.refresh_handlers[refresh_strategy] |
|
|
refresh_result = await refresh_func(window_id) |
|
|
|
|
|
|
|
|
await self._update_refresh_metrics(window, refresh_result) |
|
|
|
|
|
return { |
|
|
"status": "refreshed", |
|
|
"strategy": refresh_strategy.value, |
|
|
"items_affected": refresh_result.get("items_affected", 0), |
|
|
"new_window_utilization": window.current_size / window.size_limit |
|
|
} |
|
|
|
|
|
async def optimize_context_window( |
|
|
self, |
|
|
window_id: str, |
|
|
optimization_goals: Optional[List[str]] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Optimize context window for better performance.""" |
|
|
|
|
|
if window_id not in self.context_windows: |
|
|
raise ValueError(f"Window {window_id} does not exist") |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
if not optimization_goals: |
|
|
optimization_goals = ["relevance", "efficiency", "quality"] |
|
|
|
|
|
optimization_results = {} |
|
|
|
|
|
for goal in optimization_goals: |
|
|
if goal == "relevance": |
|
|
result = await self._optimize_relevance(window) |
|
|
optimization_results["relevance"] = result |
|
|
elif goal == "efficiency": |
|
|
result = await self._optimize_efficiency(window) |
|
|
optimization_results["efficiency"] = result |
|
|
elif goal == "quality": |
|
|
result = await self._optimize_quality(window) |
|
|
optimization_results["quality"] = result |
|
|
elif goal == "diversity": |
|
|
result = await self._optimize_diversity(window) |
|
|
optimization_results["diversity"] = result |
|
|
|
|
|
|
|
|
total_improvements = 0 |
|
|
for goal, result in optimization_results.items(): |
|
|
if result.get("improved", False): |
|
|
total_improvements += 1 |
|
|
|
|
|
await self._update_window_metrics(window) |
|
|
|
|
|
return { |
|
|
"status": "optimized", |
|
|
"optimization_goals": optimization_goals, |
|
|
"improvements_made": total_improvements, |
|
|
"optimization_results": optimization_results, |
|
|
"new_metrics": window.metrics |
|
|
} |
|
|
|
|
|
async def resolve_context_conflicts( |
|
|
self, |
|
|
window_id: str, |
|
|
conflict_resolution_strategy: str = "priority_based" |
|
|
) -> Dict[str, Any]: |
|
|
"""Resolve conflicts within context window.""" |
|
|
|
|
|
if window_id not in self.context_windows: |
|
|
raise ValueError(f"Window {window_id} does not exist") |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
|
|
|
all_conflicts = await self._detect_all_conflicts(window) |
|
|
|
|
|
if not all_conflicts: |
|
|
return { |
|
|
"status": "no_conflicts", |
|
|
"conflicts_resolved": 0 |
|
|
} |
|
|
|
|
|
|
|
|
resolved_count = 0 |
|
|
resolution_details = [] |
|
|
|
|
|
for conflict in all_conflicts: |
|
|
resolution_result = await self._resolve_single_conflict( |
|
|
window, conflict, conflict_resolution_strategy |
|
|
) |
|
|
|
|
|
if resolution_result["resolved"]: |
|
|
resolved_count += 1 |
|
|
resolution_details.append(resolution_result) |
|
|
|
|
|
|
|
|
self.metrics["conflict_resolution_rate"] = resolved_count / len(all_conflicts) |
|
|
|
|
|
return { |
|
|
"status": "conflicts_resolved", |
|
|
"total_conflicts": len(all_conflicts), |
|
|
"conflicts_resolved": resolved_count, |
|
|
"resolution_rate": resolved_count / len(all_conflicts), |
|
|
"resolution_details": resolution_details |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async def _fixed_sizing(self, window: ContextWindow, new_item: ContextItem) -> Dict[str, Any]: |
|
|
"""Fixed sizing strategy - replace lowest priority item.""" |
|
|
|
|
|
|
|
|
lowest_priority_idx = 0 |
|
|
for i, item in enumerate(window.items): |
|
|
if (item.priority.value == "low" or |
|
|
(item.relevance_score < window.items[lowest_priority_idx].relevance_score)): |
|
|
lowest_priority_idx = i |
|
|
|
|
|
|
|
|
if (new_item.priority.value in ["high", "critical"] or |
|
|
new_item.relevance_score > window.items[lowest_priority_idx].relevance_score): |
|
|
|
|
|
|
|
|
removed_item = window.items.pop(lowest_priority_idx) |
|
|
|
|
|
|
|
|
window.items.append(new_item) |
|
|
|
|
|
return { |
|
|
"status": "replaced", |
|
|
"replaced_item_id": removed_item.id, |
|
|
"new_item_id": new_item.id |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"status": "rejected", |
|
|
"reason": "lower_priority_than_existing" |
|
|
} |
|
|
|
|
|
async def _adaptive_sizing(self, window: ContextWindow, new_item: ContextItem) -> Dict[str, Any]: |
|
|
"""Adaptive sizing strategy - dynamically adjust based on relevance.""" |
|
|
|
|
|
|
|
|
current_relevances = [item.relevance_score for item in window.items] |
|
|
threshold = np.percentile(current_relevances, 25) |
|
|
|
|
|
|
|
|
if new_item.relevance_score < threshold: |
|
|
return { |
|
|
"status": "rejected", |
|
|
"reason": "below_relevance_threshold", |
|
|
"threshold": threshold |
|
|
} |
|
|
|
|
|
|
|
|
removal_candidates = [] |
|
|
for i, item in enumerate(window.items): |
|
|
score = (item.relevance_score * 0.7) + (1 - self._priority_weight(item.priority) * 0.3) |
|
|
removal_candidates.append((score, i, item)) |
|
|
|
|
|
removal_candidates.sort(key=lambda x: x[0]) |
|
|
|
|
|
|
|
|
items_removed = 0 |
|
|
for score, idx, item in removal_candidates: |
|
|
if len(window.items) + 1 <= window.size_limit: |
|
|
break |
|
|
|
|
|
window.items.pop(idx) |
|
|
items_removed += 1 |
|
|
|
|
|
|
|
|
window.items.append(new_item) |
|
|
|
|
|
return { |
|
|
"status": "adaptive_replaced", |
|
|
"items_removed": items_removed, |
|
|
"new_item_id": new_item.id, |
|
|
"relevance_threshold": threshold |
|
|
} |
|
|
|
|
|
async def _predictive_sizing(self, window: ContextWindow, new_item: ContextItem) -> Dict[str, Any]: |
|
|
"""Predictive sizing strategy - use patterns to predict future relevance.""" |
|
|
|
|
|
|
|
|
access_patterns = await self._analyze_access_patterns(window) |
|
|
|
|
|
|
|
|
predicted_relevances = {} |
|
|
for item in window.items: |
|
|
predicted_relevance = await self._predict_future_relevance(item, access_patterns) |
|
|
predicted_relevances[item.id] = predicted_relevance |
|
|
|
|
|
|
|
|
new_item_predicted = await self._predict_future_relevance(new_item, access_patterns) |
|
|
|
|
|
|
|
|
replacement_candidates = [] |
|
|
for item in window.items: |
|
|
predicted = predicted_relevances[item.id] |
|
|
current = item.relevance_score |
|
|
combined_score = (predicted * 0.6) + (current * 0.4) |
|
|
replacement_candidates.append((combined_score, item)) |
|
|
|
|
|
replacement_candidates.sort(key=lambda x: x[0]) |
|
|
|
|
|
|
|
|
if replacement_candidates and replacement_candidates[0][0] < new_item_predicted: |
|
|
replaced_item = replacement_candidates[0][1] |
|
|
window.items.remove(replaced_item) |
|
|
window.items.append(new_item) |
|
|
|
|
|
return { |
|
|
"status": "predictive_replaced", |
|
|
"replaced_item_id": replaced_item.id, |
|
|
"new_item_id": new_item.id, |
|
|
"predicted_improvement": new_item_predicted - replacement_candidates[0][0] |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"status": "rejected", |
|
|
"reason": "no_predictive_benefit" |
|
|
} |
|
|
|
|
|
async def _optimized_sizing(self, window: ContextWindow, new_item: ContextItem) -> Dict[str, Any]: |
|
|
"""Optimized sizing strategy - maximize overall utility.""" |
|
|
|
|
|
|
|
|
all_items = window.items + [new_item] |
|
|
utilities = {} |
|
|
|
|
|
for item in all_items: |
|
|
utility = await self._calculate_item_utility(item, window) |
|
|
utilities[item.id] = utility |
|
|
|
|
|
|
|
|
sorted_items = sorted(all_items, key=lambda x: utilities[x.id], reverse=True) |
|
|
optimal_items = sorted_items[:window.size_limit] |
|
|
|
|
|
|
|
|
current_ids = {item.id for item in window.items} |
|
|
optimal_ids = {item.id for item in optimal_items} |
|
|
|
|
|
if current_ids != optimal_ids: |
|
|
|
|
|
window.items = optimal_items |
|
|
|
|
|
removed_items = current_ids - optimal_ids |
|
|
added_items = optimal_ids - current_ids |
|
|
|
|
|
return { |
|
|
"status": "optimized", |
|
|
"removed_items": list(removed_items), |
|
|
"added_items": list(added_items), |
|
|
"total_utility": sum(utilities[item.id] for item in optimal_items) |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"status": "no_change", |
|
|
"reason": "already_optimal" |
|
|
} |
|
|
|
|
|
async def _compressive_sizing(self, window: ContextWindow, new_item: ContextItem) -> Dict[str, Any]: |
|
|
"""Compressive sizing strategy - compress redundant information.""" |
|
|
|
|
|
|
|
|
redundant_items = await self._identify_redundant_items(window.items + [new_item]) |
|
|
|
|
|
|
|
|
compression_result = await self._compress_information(window.items, new_item) |
|
|
|
|
|
if compression_result["compressed"]: |
|
|
window.items = compression_result["compressed_items"] |
|
|
|
|
|
return { |
|
|
"status": "compressed", |
|
|
"compression_ratio": compression_result["ratio"], |
|
|
"items_removed": compression_result["items_removed"], |
|
|
"information_preserved": compression_result["information_preserved"] |
|
|
} |
|
|
else: |
|
|
|
|
|
return await self._adaptive_sizing(window, new_item) |
|
|
|
|
|
|
|
|
|
|
|
async def _time_based_refresh(self, window_id: str) -> Dict[str, Any]: |
|
|
"""Time-based refresh strategy.""" |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
refresh_threshold = timedelta(minutes=30) |
|
|
|
|
|
current_time = datetime.utcnow() |
|
|
items_to_refresh = [] |
|
|
items_to_remove = [] |
|
|
|
|
|
for item in window.items: |
|
|
age = current_time - item.last_accessed |
|
|
|
|
|
if age > refresh_threshold: |
|
|
if item.priority.value == "low": |
|
|
items_to_remove.append(item) |
|
|
else: |
|
|
items_to_refresh.append(item) |
|
|
|
|
|
|
|
|
for item in items_to_refresh: |
|
|
await self._refresh_item_relevance(item) |
|
|
|
|
|
|
|
|
for item in items_to_remove: |
|
|
window.items.remove(item) |
|
|
window.current_size -= 1 |
|
|
|
|
|
return { |
|
|
"items_refreshed": len(items_to_refresh), |
|
|
"items_removed": len(items_to_remove), |
|
|
"refresh_type": "time_based" |
|
|
} |
|
|
|
|
|
async def _relevance_based_refresh(self, window_id: str) -> Dict[str, Any]: |
|
|
"""Relevance-based refresh strategy.""" |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
|
|
|
for item in window.items: |
|
|
await self._recalculate_relevance(item) |
|
|
|
|
|
|
|
|
relevance_threshold = 0.3 |
|
|
items_to_remove = [ |
|
|
item for item in window.items |
|
|
if item.relevance_score < relevance_threshold |
|
|
] |
|
|
|
|
|
for item in items_to_remove: |
|
|
window.items.remove(item) |
|
|
window.current_size -= 1 |
|
|
|
|
|
return { |
|
|
"items_recalculated": len(window.items), |
|
|
"items_removed": len(items_to_remove), |
|
|
"refresh_type": "relevance_based" |
|
|
} |
|
|
|
|
|
async def _interaction_based_refresh(self, window_id: str) -> Dict[str, Any]: |
|
|
"""Interaction-based refresh strategy.""" |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
|
|
|
recent_interactions = await self._get_recent_interactions(window) |
|
|
|
|
|
|
|
|
items_updated = 0 |
|
|
for item in window.items: |
|
|
old_relevance = item.relevance_score |
|
|
new_relevance = await self._update_relevance_from_interactions(item, recent_interactions) |
|
|
|
|
|
if abs(new_relevance - old_relevance) > 0.1: |
|
|
item.relevance_score = new_relevance |
|
|
items_updated += 1 |
|
|
|
|
|
return { |
|
|
"items_updated": items_updated, |
|
|
"refresh_type": "interaction_based" |
|
|
} |
|
|
|
|
|
async def _quality_based_refresh(self, window_id: str) -> Dict[str, Any]: |
|
|
"""Quality-based refresh strategy.""" |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
|
|
|
quality_updates = 0 |
|
|
for item in window.items: |
|
|
old_quality = item.quality_score |
|
|
new_quality = await self._recalculate_quality(item) |
|
|
|
|
|
if new_quality != old_quality: |
|
|
item.quality_score = new_quality |
|
|
quality_updates += 1 |
|
|
|
|
|
|
|
|
window.items.sort(key=lambda x: x.quality_score, reverse=True) |
|
|
|
|
|
return { |
|
|
"quality_updates": quality_updates, |
|
|
"refresh_type": "quality_based" |
|
|
} |
|
|
|
|
|
async def _capacity_based_refresh(self, window_id: str) -> Dict[str, Any]: |
|
|
"""Capacity-based refresh strategy.""" |
|
|
|
|
|
window = self.context_windows[window_id] |
|
|
|
|
|
utilization = window.current_size / window.size_limit |
|
|
|
|
|
if utilization > 0.9: |
|
|
|
|
|
items_to_remove = [] |
|
|
|
|
|
sorted_items = sorted( |
|
|
window.items, |
|
|
key=lambda x: (x.priority.value, x.relevance_score), |
|
|
reverse=True |
|
|
) |
|
|
|
|
|
|
|
|
remove_count = int(len(sorted_items) * 0.2) |
|
|
items_to_remove = sorted_items[:remove_count] |
|
|
|
|
|
for item in items_to_remove: |
|
|
window.items.remove(item) |
|
|
window.current_size -= 1 |
|
|
|
|
|
return { |
|
|
"items_removed": len(items_to_remove), |
|
|
"capacity_freed": len(items_to_remove), |
|
|
"refresh_type": "capacity_based" |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"items_removed": 0, |
|
|
"refresh_type": "capacity_based" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def _priority_weight(self, priority: ContextPriority) -> float: |
|
|
"""Get weight for priority level.""" |
|
|
weights = { |
|
|
ContextPriority.CRITICAL: 1.0, |
|
|
ContextPriority.HIGH: 0.8, |
|
|
ContextPriority.MEDIUM: 0.6, |
|
|
ContextPriority.LOW: 0.4, |
|
|
ContextPriority.ARCHIVED: 0.2 |
|
|
} |
|
|
return weights.get(priority, 0.5) |
|
|
|
|
|
async def _sort_context_items(self, items: List[ContextItem]) -> List[ContextItem]: |
|
|
"""Sort context items by relevance and recency.""" |
|
|
|
|
|
def sort_key(item): |
|
|
recency_score = 1.0 / (1.0 + (datetime.utcnow() - item.last_accessed).total_seconds() / 3600) |
|
|
combined_score = (item.relevance_score * 0.7) + (item.quality_score * 0.2) + (recency_score * 0.1) |
|
|
return combined_score |
|
|
|
|
|
return sorted(items, key=sort_key, reverse=True) |
|
|
|
|
|
async def _detect_conflicts(self, window: ContextWindow, new_item: ContextItem) -> List[ContextConflict]: |
|
|
"""Detect conflicts between new item and existing items.""" |
|
|
|
|
|
conflicts = [] |
|
|
|
|
|
for existing_item in window.items: |
|
|
|
|
|
if await self._are_conflicting(existing_item, new_item): |
|
|
conflict = ContextConflict( |
|
|
conflict_id=f"conflict_{existing_item.id}_{new_item.id}", |
|
|
conflicting_items=[existing_item.id, new_item.id], |
|
|
conflict_type="content_conflict", |
|
|
resolution_strategy="priority_based", |
|
|
confidence=0.8 |
|
|
) |
|
|
conflicts.append(conflict) |
|
|
|
|
|
return conflicts |
|
|
|
|
|
async def _are_conflicting(self, item1: ContextItem, item2: ContextItem) -> bool: |
|
|
"""Check if two items conflict.""" |
|
|
|
|
|
|
|
|
if item1.dependencies & {item2.id} or item2.dependencies & {item1.id}: |
|
|
return True |
|
|
|
|
|
|
|
|
if (item1.modality == item2.modality and |
|
|
item1.dimension == item2.dimension and |
|
|
item1.priority == item2.priority): |
|
|
|
|
|
return False |
|
|
|
|
|
return False |
|
|
|
|
|
async def _resolve_conflicts( |
|
|
self, |
|
|
window: ContextWindow, |
|
|
conflicts: List[ContextConflict], |
|
|
new_item: ContextItem |
|
|
) -> bool: |
|
|
"""Resolve conflicts and update window.""" |
|
|
|
|
|
resolver = self.conflict_resolver |
|
|
|
|
|
for conflict in conflicts: |
|
|
resolution_result = await resolver.resolve_conflict(window, conflict) |
|
|
|
|
|
if not resolution_result["success"]: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
async def _detect_all_conflicts(self, window: ContextWindow) -> List[ContextConflict]: |
|
|
"""Detect all conflicts in window.""" |
|
|
|
|
|
conflicts = [] |
|
|
items = window.items |
|
|
|
|
|
for i in range(len(items)): |
|
|
for j in range(i + 1, len(items)): |
|
|
if await self._are_conflicting(items[i], items[j]): |
|
|
conflict = ContextConflict( |
|
|
conflict_id=f"conflict_{items[i].id}_{items[j].id}", |
|
|
conflicting_items=[items[i].id, items[j].id], |
|
|
conflict_type="pairwise_conflict", |
|
|
resolution_strategy="priority_based", |
|
|
confidence=0.8 |
|
|
) |
|
|
conflicts.append(conflict) |
|
|
|
|
|
return conflicts |
|
|
|
|
|
async def _resolve_single_conflict( |
|
|
self, |
|
|
window: ContextWindow, |
|
|
conflict: ContextConflict, |
|
|
strategy: str |
|
|
) -> Dict[str, Any]: |
|
|
"""Resolve a single conflict.""" |
|
|
|
|
|
items = {item.id: item for item in window.items} |
|
|
conflicting_ids = conflict.conflicting_items |
|
|
|
|
|
if not all(item_id in items for item_id in conflicting_ids): |
|
|
return {"resolved": False, "reason": "missing_items"} |
|
|
|
|
|
if strategy == "priority_based": |
|
|
|
|
|
items_list = [items[item_id] for item_id in conflicting_ids] |
|
|
items_list.sort(key=lambda x: x.priority.value, reverse=True) |
|
|
|
|
|
winner = items_list[0] |
|
|
losers = items_list[1:] |
|
|
|
|
|
for loser in losers: |
|
|
window.items.remove(loser) |
|
|
window.current_size -= 1 |
|
|
|
|
|
return { |
|
|
"resolved": True, |
|
|
"winner": winner.id, |
|
|
"losers": [l.id for l in losers], |
|
|
"strategy": "priority_based" |
|
|
} |
|
|
|
|
|
elif strategy == "relevance_based": |
|
|
|
|
|
items_list = [items[item_id] for item_id in conflicting_ids] |
|
|
items_list.sort(key=lambda x: x.relevance_score, reverse=True) |
|
|
|
|
|
winner = items_list[0] |
|
|
losers = items_list[1:] |
|
|
|
|
|
for loser in losers: |
|
|
window.items.remove(loser) |
|
|
window.current_size -= 1 |
|
|
|
|
|
return { |
|
|
"resolved": True, |
|
|
"winner": winner.id, |
|
|
"losers": [l.id for l in losers], |
|
|
"strategy": "relevance_based" |
|
|
} |
|
|
|
|
|
return {"resolved": False, "reason": "unknown_strategy"} |
|
|
|
|
|
|
|
|
|
|
|
async def _analyze_access_patterns(self, window: ContextWindow) -> Dict[str, Any]: |
|
|
"""Analyze historical access patterns.""" |
|
|
|
|
|
|
|
|
access_patterns = { |
|
|
"frequent_items": [], |
|
|
"recent_activity": {}, |
|
|
"access_distribution": {} |
|
|
} |
|
|
|
|
|
for item in window.items: |
|
|
if item.access_count > 5: |
|
|
access_patterns["frequent_items"].append(item.id) |
|
|
|
|
|
access_patterns["recent_activity"][item.id] = item.access_count |
|
|
access_patterns["access_distribution"][item.priority.value] = \ |
|
|
access_patterns["access_distribution"].get(item.priority.value, 0) + 1 |
|
|
|
|
|
return access_patterns |
|
|
|
|
|
async def _predict_future_relevance(self, item: ContextItem, patterns: Dict[str, Any]) -> float: |
|
|
"""Predict future relevance of an item.""" |
|
|
|
|
|
|
|
|
base_relevance = item.relevance_score |
|
|
|
|
|
|
|
|
access_factor = min(1.5, 1.0 + (item.access_count * 0.1)) |
|
|
|
|
|
|
|
|
recency_hours = (datetime.utcnow() - item.last_accessed).total_seconds() / 3600 |
|
|
recency_factor = max(0.5, 1.0 - (recency_hours / 168)) |
|
|
|
|
|
predicted_relevance = base_relevance * access_factor * recency_factor |
|
|
return min(1.0, predicted_relevance) |
|
|
|
|
|
async def _calculate_item_utility(self, item: ContextWindow, window: ContextWindow) -> float: |
|
|
"""Calculate utility score for an item.""" |
|
|
|
|
|
|
|
|
relevance_utility = item.relevance_score * 0.4 |
|
|
quality_utility = item.quality_score * 0.3 |
|
|
priority_utility = self._priority_weight(item.priority) * 0.2 |
|
|
recency_utility = 1.0 / (1.0 + (datetime.utcnow() - item.last_accessed).total_seconds() / 3600) * 0.1 |
|
|
|
|
|
total_utility = relevance_utility + quality_utility + priority_utility + recency_utility |
|
|
return total_utility |
|
|
|
|
|
async def _identify_redundant_items(self, items: List[ContextItem]) -> List[str]: |
|
|
"""Identify redundant items in the list.""" |
|
|
|
|
|
redundant_ids = [] |
|
|
|
|
|
|
|
|
groups = defaultdict(list) |
|
|
for item in items: |
|
|
key = f"{item.modality.value}_{item.dimension.value}" |
|
|
groups[key].append(item) |
|
|
|
|
|
|
|
|
for group_items in groups.values(): |
|
|
if len(group_items) > 1: |
|
|
|
|
|
group_items.sort(key=lambda x: (x.relevance_score, x.quality_score), reverse=True) |
|
|
|
|
|
|
|
|
threshold = group_items[0].relevance_score * 0.8 |
|
|
for item in group_items[1:]: |
|
|
if item.relevance_score < threshold: |
|
|
redundant_ids.append(item.id) |
|
|
|
|
|
return redundant_ids |
|
|
|
|
|
async def _compress_information(self, items: List[ContextItem], new_item: ContextItem) -> Dict[str, Any]: |
|
|
"""Compress redundant information.""" |
|
|
|
|
|
|
|
|
all_items = items + [new_item] |
|
|
redundant_ids = await self._identify_redundant_items(all_items) |
|
|
|
|
|
if not redundant_ids: |
|
|
return {"compressed": False} |
|
|
|
|
|
|
|
|
compressed_items = [item for item in all_items if item.id not in redundant_ids] |
|
|
|
|
|
return { |
|
|
"compressed": True, |
|
|
"compressed_items": compressed_items, |
|
|
"items_removed": len(redundant_ids), |
|
|
"ratio": len(compressed_items) / len(all_items), |
|
|
"information_preserved": sum(item.quality_score for item in compressed_items) / max(1, sum(item.quality_score for item in all_items)) |
|
|
} |
|
|
|
|
|
async def _should_refresh(self, window: ContextWindow) -> Dict[str, Any]: |
|
|
"""Determine if window should be refreshed.""" |
|
|
|
|
|
time_since_refresh = datetime.utcnow() - window.last_resized |
|
|
|
|
|
|
|
|
if time_since_refresh > timedelta(minutes=60): |
|
|
return {"needed": True, "reason": "time_threshold"} |
|
|
|
|
|
|
|
|
utilization = window.current_size / window.size_limit |
|
|
if utilization > 0.95: |
|
|
return {"needed": True, "reason": "high_utilization"} |
|
|
|
|
|
|
|
|
low_quality_items = sum(1 for item in window.items if item.quality_score < 0.5) |
|
|
if low_quality_items > len(window.items) * 0.3: |
|
|
return {"needed": True, "reason": "low_quality"} |
|
|
|
|
|
return {"needed": False, "reason": "no_refresh_needed"} |
|
|
|
|
|
async def _determine_refresh_strategy(self, window: ContextWindow) -> RefreshTrigger: |
|
|
"""Determine best refresh strategy for window.""" |
|
|
|
|
|
|
|
|
utilization = window.current_size / window.size_limit |
|
|
age = (datetime.utcnow() - window.last_resized).total_seconds() / 60 |
|
|
|
|
|
|
|
|
if utilization > 0.9: |
|
|
return RefreshTrigger.CAPACITY_BASED |
|
|
elif age > 45: |
|
|
return RefreshTrigger.TIME_BASED |
|
|
elif window.strategy == SizingStrategy.PREDICTIVE: |
|
|
return RefreshTrigger.INTERACTION_BASED |
|
|
else: |
|
|
return RefreshTrigger.RELEVANCE_BASED |
|
|
|
|
|
async def _update_window_metrics(self, window: ContextWindow) -> None: |
|
|
"""Update window metrics.""" |
|
|
|
|
|
window.metrics.update({ |
|
|
"utilization": window.current_size / window.size_limit, |
|
|
"avg_relevance": np.mean([item.relevance_score for item in window.items]) if window.items else 0, |
|
|
"avg_quality": np.mean([item.quality_score for item in window.items]) if window.items else 0, |
|
|
"diversity_score": len(set(item.modality for item in window.items)) / len(window.items) if window.items else 0, |
|
|
"last_updated": datetime.utcnow().isoformat() |
|
|
}) |
|
|
|
|
|
|
|
|
self.metrics["total_items"] = sum(w.current_size for w in self.context_windows.values()) |
|
|
self.metrics["average_window_utilization"] = np.mean([w.current_size / w.size_limit for w in self.context_windows.values()]) |
|
|
|
|
|
|
|
|
|
|
|
async def _remove_context_window(self, window_id: str) -> None: |
|
|
"""Remove a context window.""" |
|
|
if window_id in self.context_windows: |
|
|
del self.context_windows[window_id] |
|
|
self.metrics["total_windows"] = len(self.context_windows) |
|
|
|
|
|
async def _refresh_item_relevance(self, item: ContextItem) -> None: |
|
|
"""Refresh item relevance score.""" |
|
|
|
|
|
item.relevance_score *= 0.95 |
|
|
|
|
|
async def _recalculate_relevance(self, item: ContextItem) -> None: |
|
|
"""Recalculate item relevance score.""" |
|
|
|
|
|
pass |
|
|
|
|
|
async def _get_recent_interactions(self, window: ContextWindow) -> List[Dict[str, Any]]: |
|
|
"""Get recent interactions affecting the window.""" |
|
|
return [] |
|
|
|
|
|
async def _update_relevance_from_interactions(self, item: ContextItem, interactions: List[Dict[str, Any]]) -> float: |
|
|
"""Update relevance based on recent interactions.""" |
|
|
return item.relevance_score |
|
|
|
|
|
async def _recalculate_quality(self, item: ContextItem) -> float: |
|
|
"""Recalculate item quality score.""" |
|
|
return item.quality_score |
|
|
|
|
|
async def _update_refresh_metrics(self, window: ContextWindow, refresh_result: Dict[str, Any]) -> None: |
|
|
"""Update refresh-related metrics.""" |
|
|
window.last_resized = datetime.utcnow() |
|
|
|
|
|
|
|
|
|
|
|
async def _optimize_relevance(self, window: ContextWindow) -> Dict[str, Any]: |
|
|
"""Optimize for relevance.""" |
|
|
|
|
|
initial_avg_relevance = np.mean([item.relevance_score for item in window.items]) if window.items else 0 |
|
|
|
|
|
|
|
|
threshold = initial_avg_relevance * 0.7 |
|
|
items_to_remove = [item for item in window.items if item.relevance_score < threshold] |
|
|
|
|
|
for item in items_to_remove: |
|
|
window.items.remove(item) |
|
|
window.current_size -= 1 |
|
|
|
|
|
final_avg_relevance = np.mean([item.relevance_score for item in window.items]) if window.items else 0 |
|
|
|
|
|
return { |
|
|
"improved": final_avg_relevance > initial_avg_relevance, |
|
|
"initial_avg": initial_avg_relevance, |
|
|
"final_avg": final_avg_relevance, |
|
|
"items_removed": len(items_to_remove) |
|
|
} |
|
|
|
|
|
async def _optimize_efficiency(self, window: ContextWindow) -> Dict[str, Any]: |
|
|
"""Optimize for efficiency.""" |
|
|
|
|
|
|
|
|
initial_diversity = len(set(item.modality for item in window.items)) / len(window.items) if window.items else 0 |
|
|
|
|
|
|
|
|
target_diversity = 0.6 |
|
|
if initial_diversity < target_diversity: |
|
|
|
|
|
pass |
|
|
|
|
|
return { |
|
|
"improved": True, |
|
|
"initial_diversity": initial_diversity, |
|
|
"target_diversity": target_diversity |
|
|
} |
|
|
|
|
|
async def _optimize_quality(self, window: ContextWindow) -> Dict[str, Any]: |
|
|
"""Optimize for quality.""" |
|
|
|
|
|
initial_avg_quality = np.mean([item.quality_score for item in window.items]) if window.items else 0 |
|
|
|
|
|
|
|
|
window.items.sort(key=lambda x: x.quality_score, reverse=True) |
|
|
|
|
|
final_avg_quality = np.mean([item.quality_score for item in window.items]) if window.items else 0 |
|
|
|
|
|
return { |
|
|
"improved": final_avg_quality >= initial_avg_quality, |
|
|
"initial_avg": initial_avg_quality, |
|
|
"final_avg": final_avg_quality |
|
|
} |
|
|
|
|
|
async def _optimize_diversity(self, window: ContextWindow) -> Dict[str, Any]: |
|
|
"""Optimize for diversity.""" |
|
|
|
|
|
modality_counts = defaultdict(int) |
|
|
for item in window.items: |
|
|
modality_counts[item.modality] += 1 |
|
|
|
|
|
|
|
|
max_modality_count = max(modality_counts.values()) if modality_counts else 0 |
|
|
target_per_modality = len(window.items) // len(modality_counts) if modality_counts else 0 |
|
|
|
|
|
|
|
|
return { |
|
|
"improved": True, |
|
|
"modality_distribution": dict(modality_counts), |
|
|
"max_modality_count": max_modality_count |
|
|
} |
|
|
|
|
|
async def _schedule_refresh(self, window_id: str, trigger: RefreshTrigger) -> None: |
|
|
"""Schedule a refresh for the window.""" |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
@property |
|
|
def sizing_strategies(self) -> Dict[SizingStrategy, Callable]: |
|
|
"""Get available sizing strategies.""" |
|
|
return self._sizing_strategies if hasattr(self, '_sizing_strategies') else self.sizing_algorithms |
|
|
|
|
|
|
|
|
class ContextConflictResolver: |
|
|
"""Specialized conflict resolution for context items.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.resolution_strategies = { |
|
|
"priority_based": self._priority_resolution, |
|
|
"relevance_based": self._relevance_resolution, |
|
|
"quality_based": self._quality_resolution, |
|
|
"temporal_based": self._temporal_resolution |
|
|
} |
|
|
|
|
|
async def resolve_conflict(self, window: ContextWindow, conflict: ContextConflict) -> Dict[str, Any]: |
|
|
"""Resolve a context conflict.""" |
|
|
|
|
|
strategy_func = self.resolution_strategies.get(conflict.resolution_strategy) |
|
|
if not strategy_func: |
|
|
return {"success": False, "reason": "unknown_strategy"} |
|
|
|
|
|
return await strategy_func(window, conflict) |
|
|
|
|
|
async def _priority_resolution(self, window: ContextWindow, conflict: ContextConflict) -> Dict[str, Any]: |
|
|
"""Resolve conflict based on priority.""" |
|
|
|
|
|
return {"success": True, "strategy": "priority_based"} |
|
|
|
|
|
async def _relevance_resolution(self, window: ContextWindow, conflict: ContextConflict) -> Dict[str, Any]: |
|
|
"""Resolve conflict based on relevance.""" |
|
|
|
|
|
return {"success": True, "strategy": "relevance_based"} |
|
|
|
|
|
async def _quality_resolution(self, window: ContextWindow, conflict: ContextConflict) -> Dict[str, Any]: |
|
|
"""Resolve conflict based on quality.""" |
|
|
|
|
|
return {"success": True, "strategy": "quality_based"} |
|
|
|
|
|
async def _temporal_resolution(self, window: ContextWindow, conflict: ContextConflict) -> Dict[str, Any]: |
|
|
"""Resolve conflict based on temporal information.""" |
|
|
|
|
|
return {"success": True, "strategy": "temporal_based"} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("Context Management with Dynamic Sizing System Initialized") |
|
|
print("=" * 60) |
|
|
manager = ContextManager() |
|
|
print("Ready for advanced context management and dynamic sizing!") |