GlycoAI / unified_data_manager.py
Valentina9502's picture
Upload 3 files
cdda4ac verified
"""
Unified Data Manager for GlycoAI - MODIFIED VERSION
Sarah now has unstable glucose values for demonstration
"""
import logging
from typing import Dict, Any, Optional, Tuple
import pandas as pd
from datetime import datetime, timedelta
from dataclasses import asdict
import numpy as np
import random
from apifunctions import (
DexcomAPI,
GlucoseAnalyzer,
DEMO_USERS,
DemoUser
)
logger = logging.getLogger(__name__)
class UnifiedDataManager:
"""
MODIFIED: Unified data manager with Sarah having unstable glucose patterns
"""
def __init__(self):
self.dexcom_api = DexcomAPI()
self.analyzer = GlucoseAnalyzer()
logger.info(f"UnifiedDataManager initialized - Sarah will have unstable glucose patterns")
# Single source of truth for all data
self.current_user: Optional[DemoUser] = None
self.raw_glucose_data: Optional[list] = None
self.processed_glucose_data: Optional[pd.DataFrame] = None
self.calculated_stats: Optional[Dict] = None
self.identified_patterns: Optional[Dict] = None
# Metadata
self.data_loaded_at: Optional[datetime] = None
self.data_source: str = "none" # "dexcom_api", "mock", or "none"
def load_user_data(self, user_key: str, force_reload: bool = False) -> Dict[str, Any]:
"""
MODIFIED: Load glucose data with Sarah having unstable patterns
"""
# Check if we already have data for this user and it's recent
if (not force_reload and
self.current_user and
self.current_user == DEMO_USERS.get(user_key) and
self.data_loaded_at and
(datetime.now() - self.data_loaded_at).seconds < 300): # 5 minutes cache
logger.info(f"Using cached data for {user_key}")
return self._build_success_response()
try:
if user_key not in DEMO_USERS:
return {
"success": False,
"message": f"❌ Invalid user key '{user_key}'. Available: {', '.join(DEMO_USERS.keys())}"
}
logger.info(f"Loading data for user: {user_key}")
# Set current user
self.current_user = DEMO_USERS[user_key]
# Call API EXACTLY as it was working before
try:
logger.info(f"Attempting Dexcom API authentication for {user_key}")
# ORIGINAL WORKING METHOD: Use the simulate_demo_login exactly as before
access_token = self.dexcom_api.simulate_demo_login(user_key)
logger.info(f"Dexcom authentication result: {bool(access_token)}")
if access_token:
# ORIGINAL WORKING METHOD: Get data with 14-day range
end_date = datetime.now()
start_date = end_date - timedelta(days=14)
# Call get_egv_data EXACTLY as it was working before
self.raw_glucose_data = self.dexcom_api.get_egv_data(
start_date.isoformat(),
end_date.isoformat()
)
if self.raw_glucose_data and len(self.raw_glucose_data) > 0:
self.data_source = "dexcom_api"
logger.info(f"βœ… Successfully loaded {len(self.raw_glucose_data)} readings from Dexcom API")
else:
logger.warning("Dexcom API returned empty data - falling back to mock data")
raise Exception("Empty data from Dexcom API")
else:
logger.warning("Failed to get access token - falling back to mock data")
raise Exception("Authentication failed")
except Exception as api_error:
logger.warning(f"Dexcom API failed ({str(api_error)}) - using mock data fallback")
self.raw_glucose_data = self._generate_realistic_mock_data(user_key)
self.data_source = "mock"
# Process the raw data (same processing for everyone)
self.processed_glucose_data = self.analyzer.process_egv_data(self.raw_glucose_data)
if self.processed_glucose_data is None or self.processed_glucose_data.empty:
return {
"success": False,
"message": "❌ Failed to process glucose data"
}
# Calculate statistics (single source of truth)
self.calculated_stats = self._calculate_unified_stats()
# Identify patterns
self.identified_patterns = self.analyzer.identify_patterns(self.processed_glucose_data)
# Mark when data was loaded
self.data_loaded_at = datetime.now()
logger.info(f"Successfully loaded and processed data for {self.current_user.name}")
logger.info(f"Data source: {self.data_source}, Readings: {len(self.processed_glucose_data)}")
logger.info(f"TIR: {self.calculated_stats.get('time_in_range_70_180', 0):.1f}%")
return self._build_success_response()
except Exception as e:
logger.error(f"Failed to load user data: {e}")
return {
"success": False,
"message": f"❌ Failed to load user data: {str(e)}"
}
def get_stats_for_ui(self) -> Dict[str, Any]:
"""Get statistics formatted for the UI display"""
if not self.calculated_stats:
return {}
return {
**self.calculated_stats,
"data_source": self.data_source,
"loaded_at": self.data_loaded_at.isoformat() if self.data_loaded_at else None,
"user_name": self.current_user.name if self.current_user else None
}
def get_context_for_agent(self) -> Dict[str, Any]:
"""Get context formatted for the AI agent"""
if not self.current_user or not self.calculated_stats:
return {"error": "No user data loaded"}
# Build agent context with the SAME data as UI
context = {
"user": {
"name": self.current_user.name,
"age": self.current_user.age,
"diabetes_type": self.current_user.diabetes_type,
"device_type": self.current_user.device_type,
"years_with_diabetes": self.current_user.years_with_diabetes,
"typical_pattern": getattr(self.current_user, 'typical_glucose_pattern', 'normal')
},
"statistics": self._safe_convert_for_json(self.calculated_stats),
"patterns": self._safe_convert_for_json(self.identified_patterns),
"data_points": len(self.processed_glucose_data) if self.processed_glucose_data is not None else 0,
"recent_readings": self._get_recent_readings_for_agent(),
"data_metadata": {
"source": self.data_source,
"loaded_at": self.data_loaded_at.isoformat() if self.data_loaded_at else None,
"data_age_minutes": int((datetime.now() - self.data_loaded_at).total_seconds() / 60) if self.data_loaded_at else None
}
}
return context
def get_chart_data(self) -> Optional[pd.DataFrame]:
"""Get processed data for chart display"""
return self.processed_glucose_data
def _calculate_unified_stats(self) -> Dict[str, Any]:
"""Calculate statistics using a single, consistent method"""
if self.processed_glucose_data is None or self.processed_glucose_data.empty:
return {"error": "No data available"}
try:
# Get glucose values
glucose_values = self.processed_glucose_data['value'].dropna()
if len(glucose_values) == 0:
return {"error": "No valid glucose values"}
# Convert to numpy array for consistent calculations
import numpy as np
values = np.array(glucose_values.tolist(), dtype=float)
# Calculate basic statistics
avg_glucose = float(np.mean(values))
min_glucose = float(np.min(values))
max_glucose = float(np.max(values))
std_glucose = float(np.std(values))
total_readings = int(len(values))
# Calculate time in ranges - CONSISTENT METHOD
in_range_mask = (values >= 70) & (values <= 180)
below_range_mask = values < 70
above_range_mask = values > 180
in_range_count = int(np.sum(in_range_mask))
below_range_count = int(np.sum(below_range_mask))
above_range_count = int(np.sum(above_range_mask))
# Calculate percentages
time_in_range = (in_range_count / total_readings) * 100 if total_readings > 0 else 0
time_below_70 = (below_range_count / total_readings) * 100 if total_readings > 0 else 0
time_above_180 = (above_range_count / total_readings) * 100 if total_readings > 0 else 0
# Calculate additional metrics
gmi = 3.31 + (0.02392 * avg_glucose) # Glucose Management Indicator
cv = (std_glucose / avg_glucose) * 100 if avg_glucose > 0 else 0 # Coefficient of Variation
stats = {
"average_glucose": avg_glucose,
"min_glucose": min_glucose,
"max_glucose": max_glucose,
"std_glucose": std_glucose,
"time_in_range_70_180": time_in_range,
"time_below_70": time_below_70,
"time_above_180": time_above_180,
"total_readings": total_readings,
"gmi": gmi,
"cv": cv,
"in_range_count": in_range_count,
"below_range_count": below_range_count,
"above_range_count": above_range_count
}
# Log for debugging
logger.info(f"Calculated stats - TIR: {time_in_range:.1f}%, Total: {total_readings}, In range: {in_range_count}")
return stats
except Exception as e:
logger.error(f"Error calculating unified stats: {e}")
return {"error": f"Statistics calculation failed: {str(e)}"}
def _generate_realistic_mock_data(self, user_key: str) -> list:
"""Generate realistic mock data with SARAH having UNSTABLE patterns"""
# MODIFIED: Sarah now has unstable glucose patterns
pattern_map = {
"sarah_g7": "unstable_high_variability", # CHANGED: Sarah now unstable
"marcus_one": "dawn_phenomenon",
"jennifer_g6": "normal",
"robert_receiver": "dawn_phenomenon"
}
user_pattern = pattern_map.get(user_key, "normal")
# Generate 14 days of data with specific patterns
if user_key == "sarah_g7":
# Generate UNSTABLE data for Sarah
mock_data = self._generate_unstable_glucose_data()
logger.info(f"Generated {len(mock_data)} UNSTABLE mock data points for Sarah")
else:
# Use normal patterns for other users
mock_data = self._create_realistic_pattern(days=14, user_type=user_pattern)
logger.info(f"Generated {len(mock_data)} mock data points for {user_key} with pattern {user_pattern}")
return mock_data
def _generate_unstable_glucose_data(self) -> list:
"""Generate highly variable, unstable glucose data for Sarah"""
readings = []
now = datetime.now()
# Generate 14 days of unstable data (every 5 minutes)
total_minutes = 14 * 24 * 60
interval_minutes = 5
total_readings = total_minutes // interval_minutes
logger.info(f"Generating {total_readings} unstable glucose readings for Sarah")
for i in range(total_readings):
timestamp = now - timedelta(minutes=total_minutes - (i * interval_minutes))
# Create highly variable glucose patterns
hour = timestamp.hour
day_of_week = timestamp.weekday()
# Base glucose with high variability
if hour >= 6 and hour <= 8: # Morning - dawn phenomenon + high variability
base_glucose = random.uniform(140, 220)
variability = random.uniform(-40, 60)
elif hour >= 12 and hour <= 14: # Lunch - post-meal spikes
base_glucose = random.uniform(120, 280)
variability = random.uniform(-30, 80)
elif hour >= 18 and hour <= 20: # Dinner - high spikes
base_glucose = random.uniform(130, 300)
variability = random.uniform(-50, 70)
elif hour >= 22 or hour <= 4: # Night - unpredictable lows and highs
base_glucose = random.uniform(60, 200)
variability = random.uniform(-30, 50)
else: # Other times - still unstable
base_glucose = random.uniform(80, 220)
variability = random.uniform(-40, 60)
# Add weekend effect (even more unstable)
if day_of_week >= 5: # Weekend
base_glucose += random.uniform(-20, 40)
variability += random.uniform(-20, 30)
# Add random noise for high variability
noise = random.uniform(-25, 25)
glucose_value = base_glucose + variability + noise
# Ensure realistic bounds but allow extreme values
glucose_value = max(40, min(400, glucose_value))
# Add some random severe lows and highs
if random.random() < 0.05: # 5% chance of severe events
if random.random() < 0.5:
glucose_value = random.uniform(45, 65) # Severe low
else:
glucose_value = random.uniform(280, 350) # Severe high
# Determine trend based on glucose change
if i > 0:
prev_glucose = readings[-1]['value']
glucose_change = glucose_value - prev_glucose
if glucose_change > 15:
trend = "rising_rapidly"
elif glucose_change > 5:
trend = "rising"
elif glucose_change < -15:
trend = "falling_rapidly"
elif glucose_change < -5:
trend = "falling"
else:
trend = "flat"
else:
trend = "flat"
reading = {
"systemTime": timestamp.isoformat(),
"displayTime": timestamp.isoformat(),
"value": round(glucose_value, 1),
"trend": trend,
"realtimeValue": round(glucose_value, 1),
"smoothedValue": round(glucose_value * 0.9 + random.uniform(-5, 5), 1)
}
readings.append(reading)
# Log statistics of generated data
values = [r['value'] for r in readings]
avg_glucose = np.mean(values)
std_glucose = np.std(values)
cv = (std_glucose / avg_glucose) * 100
in_range = sum(1 for v in values if 70 <= v <= 180)
below_range = sum(1 for v in values if v < 70)
above_range = sum(1 for v in values if v > 180)
tir = (in_range / len(values)) * 100
tbr = (below_range / len(values)) * 100
tar = (above_range / len(values)) * 100
logger.info(f"Sarah's UNSTABLE data generated:")
logger.info(f" Average: {avg_glucose:.1f} mg/dL")
logger.info(f" CV: {cv:.1f}% (VERY HIGH)")
logger.info(f" TIR: {tir:.1f}% (LOW)")
logger.info(f" TBR: {tbr:.1f}% (HIGH)")
logger.info(f" TAR: {tar:.1f}% (HIGH)")
return readings
def _create_realistic_pattern(self, days: int = 14, user_type: str = "normal") -> list:
"""Create realistic glucose patterns for non-Sarah users"""
readings = []
now = datetime.now()
# Generate data every 5 minutes
total_minutes = days * 24 * 60
interval_minutes = 5
total_readings = total_minutes // interval_minutes
for i in range(total_readings):
timestamp = now - timedelta(minutes=total_minutes - (i * interval_minutes))
hour = timestamp.hour
# Base patterns for different user types
if user_type == "dawn_phenomenon":
if hour >= 6 and hour <= 8: # Dawn phenomenon
base_glucose = random.uniform(150, 190)
elif hour >= 12 and hour <= 14: # Post lunch
base_glucose = random.uniform(140, 180)
elif hour >= 18 and hour <= 20: # Post dinner
base_glucose = random.uniform(130, 170)
else:
base_glucose = random.uniform(90, 140)
else: # Normal pattern
if hour >= 12 and hour <= 14: # Post lunch
base_glucose = random.uniform(120, 160)
elif hour >= 18 and hour <= 20: # Post dinner
base_glucose = random.uniform(110, 150)
else:
base_glucose = random.uniform(80, 120)
# Add moderate variability
glucose_value = base_glucose + random.uniform(-15, 15)
glucose_value = max(70, min(250, glucose_value))
reading = {
"systemTime": timestamp.isoformat(),
"displayTime": timestamp.isoformat(),
"value": round(glucose_value, 1),
"trend": "flat",
"realtimeValue": round(glucose_value, 1),
"smoothedValue": round(glucose_value, 1)
}
readings.append(reading)
return readings
def _get_recent_readings_for_agent(self, count: int = 5) -> list:
"""Get recent readings formatted for agent context"""
if self.processed_glucose_data is None or self.processed_glucose_data.empty:
return []
try:
recent_df = self.processed_glucose_data.tail(count)
readings = []
for _, row in recent_df.iterrows():
display_time = row.get('displayTime') or row.get('systemTime')
glucose_value = row.get('value')
trend_value = row.get('trend', 'flat')
if pd.notna(display_time):
if isinstance(display_time, str):
time_str = display_time
else:
time_str = pd.to_datetime(display_time).isoformat()
else:
time_str = datetime.now().isoformat()
if pd.notna(glucose_value):
glucose_clean = self._safe_convert_for_json(glucose_value)
else:
glucose_clean = None
trend_clean = str(trend_value) if pd.notna(trend_value) else 'flat'
readings.append({
"time": time_str,
"glucose": glucose_clean,
"trend": trend_clean
})
return readings
except Exception as e:
logger.error(f"Error getting recent readings: {e}")
return []
def _safe_convert_for_json(self, obj):
"""Safely convert objects for JSON serialization"""
import numpy as np
if obj is None:
return None
elif isinstance(obj, (np.integer, np.int64, np.int32)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64, np.float32)):
if np.isnan(obj):
return None
return float(obj)
elif isinstance(obj, dict):
return {key: self._safe_convert_for_json(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._safe_convert_for_json(item) for item in obj]
elif isinstance(obj, pd.Timestamp):
return obj.isoformat()
else:
return obj
def _build_success_response(self) -> Dict[str, Any]:
"""Build a consistent success response"""
data_points = len(self.processed_glucose_data) if self.processed_glucose_data is not None else 0
avg_glucose = self.calculated_stats.get('average_glucose', 0)
time_in_range = self.calculated_stats.get('time_in_range_70_180', 0)
return {
"success": True,
"message": f"βœ… Successfully loaded data for {self.current_user.name}",
"user": asdict(self.current_user),
"data_points": data_points,
"stats": self.calculated_stats,
"data_source": self.data_source,
"summary": f"πŸ“Š {data_points} readings | Avg: {avg_glucose:.1f} mg/dL | TIR: {time_in_range:.1f}% | Source: {self.data_source}"
}
def validate_data_consistency(self) -> Dict[str, Any]:
"""Validate that all components are using consistent data"""
if not self.calculated_stats:
return {"valid": False, "message": "No data loaded"}
validation = {
"valid": True,
"data_source": self.data_source,
"data_age_minutes": int((datetime.now() - self.data_loaded_at).total_seconds() / 60) if self.data_loaded_at else None,
"total_readings": self.calculated_stats.get('total_readings', 0),
"time_in_range": self.calculated_stats.get('time_in_range_70_180', 0),
"average_glucose": self.calculated_stats.get('average_glucose', 0),
"user": self.current_user.name if self.current_user else None
}
logger.info(f"Data consistency check: {validation}")
return validation
# ADDITIONAL: Debug function to test the API connection as it was working before
def test_original_api_method():
"""Test the API exactly as it was working before unified data manager"""
from apifunctions import DexcomAPI, DEMO_USERS
print("πŸ” Testing API exactly as it was working before...")
api = DexcomAPI()
# Test with sarah_g7 as it was working before
user_key = "sarah_g7"
user = DEMO_USERS[user_key]
print(f"Testing with {user.name} ({user.username}) - NOW WITH UNSTABLE GLUCOSE")
try:
# Call simulate_demo_login exactly as before
access_token = api.simulate_demo_login(user_key)
print(f"βœ… Authentication: {bool(access_token)}")
if access_token:
# Call get_egv_data exactly as before
end_date = datetime.now()
start_date = end_date - timedelta(days=14)
egv_data = api.get_egv_data(
start_date.isoformat(),
end_date.isoformat()
)
print(f"βœ… EGV Data: {len(egv_data)} readings")
if egv_data:
print(f"βœ… SUCCESS! API is working as before (with Sarah's unstable patterns)")
sample = egv_data[0] if egv_data else {}
print(f"Sample reading: {sample}")
return True
else:
print("⚠️ API authenticated but returned no data")
return False
else:
print("❌ Authentication failed")
return False
except Exception as e:
print(f"❌ Error: {e}")
return False
if __name__ == "__main__":
# Test the original API method
test_original_api_method()