Spaces:
Running
Running
File size: 24,532 Bytes
cdda4ac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
"""
Unified Data Manager for GlycoAI - MODIFIED VERSION
Sarah now has unstable glucose values for demonstration
"""
import logging
from typing import Dict, Any, Optional, Tuple
import pandas as pd
from datetime import datetime, timedelta
from dataclasses import asdict
import numpy as np
import random
from apifunctions import (
DexcomAPI,
GlucoseAnalyzer,
DEMO_USERS,
DemoUser
)
logger = logging.getLogger(__name__)
class UnifiedDataManager:
"""
MODIFIED: Unified data manager with Sarah having unstable glucose patterns
"""
def __init__(self):
self.dexcom_api = DexcomAPI()
self.analyzer = GlucoseAnalyzer()
logger.info(f"UnifiedDataManager initialized - Sarah will have unstable glucose patterns")
# Single source of truth for all data
self.current_user: Optional[DemoUser] = None
self.raw_glucose_data: Optional[list] = None
self.processed_glucose_data: Optional[pd.DataFrame] = None
self.calculated_stats: Optional[Dict] = None
self.identified_patterns: Optional[Dict] = None
# Metadata
self.data_loaded_at: Optional[datetime] = None
self.data_source: str = "none" # "dexcom_api", "mock", or "none"
def load_user_data(self, user_key: str, force_reload: bool = False) -> Dict[str, Any]:
"""
MODIFIED: Load glucose data with Sarah having unstable patterns
"""
# Check if we already have data for this user and it's recent
if (not force_reload and
self.current_user and
self.current_user == DEMO_USERS.get(user_key) and
self.data_loaded_at and
(datetime.now() - self.data_loaded_at).seconds < 300): # 5 minutes cache
logger.info(f"Using cached data for {user_key}")
return self._build_success_response()
try:
if user_key not in DEMO_USERS:
return {
"success": False,
"message": f"β Invalid user key '{user_key}'. Available: {', '.join(DEMO_USERS.keys())}"
}
logger.info(f"Loading data for user: {user_key}")
# Set current user
self.current_user = DEMO_USERS[user_key]
# Call API EXACTLY as it was working before
try:
logger.info(f"Attempting Dexcom API authentication for {user_key}")
# ORIGINAL WORKING METHOD: Use the simulate_demo_login exactly as before
access_token = self.dexcom_api.simulate_demo_login(user_key)
logger.info(f"Dexcom authentication result: {bool(access_token)}")
if access_token:
# ORIGINAL WORKING METHOD: Get data with 14-day range
end_date = datetime.now()
start_date = end_date - timedelta(days=14)
# Call get_egv_data EXACTLY as it was working before
self.raw_glucose_data = self.dexcom_api.get_egv_data(
start_date.isoformat(),
end_date.isoformat()
)
if self.raw_glucose_data and len(self.raw_glucose_data) > 0:
self.data_source = "dexcom_api"
logger.info(f"β
Successfully loaded {len(self.raw_glucose_data)} readings from Dexcom API")
else:
logger.warning("Dexcom API returned empty data - falling back to mock data")
raise Exception("Empty data from Dexcom API")
else:
logger.warning("Failed to get access token - falling back to mock data")
raise Exception("Authentication failed")
except Exception as api_error:
logger.warning(f"Dexcom API failed ({str(api_error)}) - using mock data fallback")
self.raw_glucose_data = self._generate_realistic_mock_data(user_key)
self.data_source = "mock"
# Process the raw data (same processing for everyone)
self.processed_glucose_data = self.analyzer.process_egv_data(self.raw_glucose_data)
if self.processed_glucose_data is None or self.processed_glucose_data.empty:
return {
"success": False,
"message": "β Failed to process glucose data"
}
# Calculate statistics (single source of truth)
self.calculated_stats = self._calculate_unified_stats()
# Identify patterns
self.identified_patterns = self.analyzer.identify_patterns(self.processed_glucose_data)
# Mark when data was loaded
self.data_loaded_at = datetime.now()
logger.info(f"Successfully loaded and processed data for {self.current_user.name}")
logger.info(f"Data source: {self.data_source}, Readings: {len(self.processed_glucose_data)}")
logger.info(f"TIR: {self.calculated_stats.get('time_in_range_70_180', 0):.1f}%")
return self._build_success_response()
except Exception as e:
logger.error(f"Failed to load user data: {e}")
return {
"success": False,
"message": f"β Failed to load user data: {str(e)}"
}
def get_stats_for_ui(self) -> Dict[str, Any]:
"""Get statistics formatted for the UI display"""
if not self.calculated_stats:
return {}
return {
**self.calculated_stats,
"data_source": self.data_source,
"loaded_at": self.data_loaded_at.isoformat() if self.data_loaded_at else None,
"user_name": self.current_user.name if self.current_user else None
}
def get_context_for_agent(self) -> Dict[str, Any]:
"""Get context formatted for the AI agent"""
if not self.current_user or not self.calculated_stats:
return {"error": "No user data loaded"}
# Build agent context with the SAME data as UI
context = {
"user": {
"name": self.current_user.name,
"age": self.current_user.age,
"diabetes_type": self.current_user.diabetes_type,
"device_type": self.current_user.device_type,
"years_with_diabetes": self.current_user.years_with_diabetes,
"typical_pattern": getattr(self.current_user, 'typical_glucose_pattern', 'normal')
},
"statistics": self._safe_convert_for_json(self.calculated_stats),
"patterns": self._safe_convert_for_json(self.identified_patterns),
"data_points": len(self.processed_glucose_data) if self.processed_glucose_data is not None else 0,
"recent_readings": self._get_recent_readings_for_agent(),
"data_metadata": {
"source": self.data_source,
"loaded_at": self.data_loaded_at.isoformat() if self.data_loaded_at else None,
"data_age_minutes": int((datetime.now() - self.data_loaded_at).total_seconds() / 60) if self.data_loaded_at else None
}
}
return context
def get_chart_data(self) -> Optional[pd.DataFrame]:
"""Get processed data for chart display"""
return self.processed_glucose_data
def _calculate_unified_stats(self) -> Dict[str, Any]:
"""Calculate statistics using a single, consistent method"""
if self.processed_glucose_data is None or self.processed_glucose_data.empty:
return {"error": "No data available"}
try:
# Get glucose values
glucose_values = self.processed_glucose_data['value'].dropna()
if len(glucose_values) == 0:
return {"error": "No valid glucose values"}
# Convert to numpy array for consistent calculations
import numpy as np
values = np.array(glucose_values.tolist(), dtype=float)
# Calculate basic statistics
avg_glucose = float(np.mean(values))
min_glucose = float(np.min(values))
max_glucose = float(np.max(values))
std_glucose = float(np.std(values))
total_readings = int(len(values))
# Calculate time in ranges - CONSISTENT METHOD
in_range_mask = (values >= 70) & (values <= 180)
below_range_mask = values < 70
above_range_mask = values > 180
in_range_count = int(np.sum(in_range_mask))
below_range_count = int(np.sum(below_range_mask))
above_range_count = int(np.sum(above_range_mask))
# Calculate percentages
time_in_range = (in_range_count / total_readings) * 100 if total_readings > 0 else 0
time_below_70 = (below_range_count / total_readings) * 100 if total_readings > 0 else 0
time_above_180 = (above_range_count / total_readings) * 100 if total_readings > 0 else 0
# Calculate additional metrics
gmi = 3.31 + (0.02392 * avg_glucose) # Glucose Management Indicator
cv = (std_glucose / avg_glucose) * 100 if avg_glucose > 0 else 0 # Coefficient of Variation
stats = {
"average_glucose": avg_glucose,
"min_glucose": min_glucose,
"max_glucose": max_glucose,
"std_glucose": std_glucose,
"time_in_range_70_180": time_in_range,
"time_below_70": time_below_70,
"time_above_180": time_above_180,
"total_readings": total_readings,
"gmi": gmi,
"cv": cv,
"in_range_count": in_range_count,
"below_range_count": below_range_count,
"above_range_count": above_range_count
}
# Log for debugging
logger.info(f"Calculated stats - TIR: {time_in_range:.1f}%, Total: {total_readings}, In range: {in_range_count}")
return stats
except Exception as e:
logger.error(f"Error calculating unified stats: {e}")
return {"error": f"Statistics calculation failed: {str(e)}"}
def _generate_realistic_mock_data(self, user_key: str) -> list:
"""Generate realistic mock data with SARAH having UNSTABLE patterns"""
# MODIFIED: Sarah now has unstable glucose patterns
pattern_map = {
"sarah_g7": "unstable_high_variability", # CHANGED: Sarah now unstable
"marcus_one": "dawn_phenomenon",
"jennifer_g6": "normal",
"robert_receiver": "dawn_phenomenon"
}
user_pattern = pattern_map.get(user_key, "normal")
# Generate 14 days of data with specific patterns
if user_key == "sarah_g7":
# Generate UNSTABLE data for Sarah
mock_data = self._generate_unstable_glucose_data()
logger.info(f"Generated {len(mock_data)} UNSTABLE mock data points for Sarah")
else:
# Use normal patterns for other users
mock_data = self._create_realistic_pattern(days=14, user_type=user_pattern)
logger.info(f"Generated {len(mock_data)} mock data points for {user_key} with pattern {user_pattern}")
return mock_data
def _generate_unstable_glucose_data(self) -> list:
"""Generate highly variable, unstable glucose data for Sarah"""
readings = []
now = datetime.now()
# Generate 14 days of unstable data (every 5 minutes)
total_minutes = 14 * 24 * 60
interval_minutes = 5
total_readings = total_minutes // interval_minutes
logger.info(f"Generating {total_readings} unstable glucose readings for Sarah")
for i in range(total_readings):
timestamp = now - timedelta(minutes=total_minutes - (i * interval_minutes))
# Create highly variable glucose patterns
hour = timestamp.hour
day_of_week = timestamp.weekday()
# Base glucose with high variability
if hour >= 6 and hour <= 8: # Morning - dawn phenomenon + high variability
base_glucose = random.uniform(140, 220)
variability = random.uniform(-40, 60)
elif hour >= 12 and hour <= 14: # Lunch - post-meal spikes
base_glucose = random.uniform(120, 280)
variability = random.uniform(-30, 80)
elif hour >= 18 and hour <= 20: # Dinner - high spikes
base_glucose = random.uniform(130, 300)
variability = random.uniform(-50, 70)
elif hour >= 22 or hour <= 4: # Night - unpredictable lows and highs
base_glucose = random.uniform(60, 200)
variability = random.uniform(-30, 50)
else: # Other times - still unstable
base_glucose = random.uniform(80, 220)
variability = random.uniform(-40, 60)
# Add weekend effect (even more unstable)
if day_of_week >= 5: # Weekend
base_glucose += random.uniform(-20, 40)
variability += random.uniform(-20, 30)
# Add random noise for high variability
noise = random.uniform(-25, 25)
glucose_value = base_glucose + variability + noise
# Ensure realistic bounds but allow extreme values
glucose_value = max(40, min(400, glucose_value))
# Add some random severe lows and highs
if random.random() < 0.05: # 5% chance of severe events
if random.random() < 0.5:
glucose_value = random.uniform(45, 65) # Severe low
else:
glucose_value = random.uniform(280, 350) # Severe high
# Determine trend based on glucose change
if i > 0:
prev_glucose = readings[-1]['value']
glucose_change = glucose_value - prev_glucose
if glucose_change > 15:
trend = "rising_rapidly"
elif glucose_change > 5:
trend = "rising"
elif glucose_change < -15:
trend = "falling_rapidly"
elif glucose_change < -5:
trend = "falling"
else:
trend = "flat"
else:
trend = "flat"
reading = {
"systemTime": timestamp.isoformat(),
"displayTime": timestamp.isoformat(),
"value": round(glucose_value, 1),
"trend": trend,
"realtimeValue": round(glucose_value, 1),
"smoothedValue": round(glucose_value * 0.9 + random.uniform(-5, 5), 1)
}
readings.append(reading)
# Log statistics of generated data
values = [r['value'] for r in readings]
avg_glucose = np.mean(values)
std_glucose = np.std(values)
cv = (std_glucose / avg_glucose) * 100
in_range = sum(1 for v in values if 70 <= v <= 180)
below_range = sum(1 for v in values if v < 70)
above_range = sum(1 for v in values if v > 180)
tir = (in_range / len(values)) * 100
tbr = (below_range / len(values)) * 100
tar = (above_range / len(values)) * 100
logger.info(f"Sarah's UNSTABLE data generated:")
logger.info(f" Average: {avg_glucose:.1f} mg/dL")
logger.info(f" CV: {cv:.1f}% (VERY HIGH)")
logger.info(f" TIR: {tir:.1f}% (LOW)")
logger.info(f" TBR: {tbr:.1f}% (HIGH)")
logger.info(f" TAR: {tar:.1f}% (HIGH)")
return readings
def _create_realistic_pattern(self, days: int = 14, user_type: str = "normal") -> list:
"""Create realistic glucose patterns for non-Sarah users"""
readings = []
now = datetime.now()
# Generate data every 5 minutes
total_minutes = days * 24 * 60
interval_minutes = 5
total_readings = total_minutes // interval_minutes
for i in range(total_readings):
timestamp = now - timedelta(minutes=total_minutes - (i * interval_minutes))
hour = timestamp.hour
# Base patterns for different user types
if user_type == "dawn_phenomenon":
if hour >= 6 and hour <= 8: # Dawn phenomenon
base_glucose = random.uniform(150, 190)
elif hour >= 12 and hour <= 14: # Post lunch
base_glucose = random.uniform(140, 180)
elif hour >= 18 and hour <= 20: # Post dinner
base_glucose = random.uniform(130, 170)
else:
base_glucose = random.uniform(90, 140)
else: # Normal pattern
if hour >= 12 and hour <= 14: # Post lunch
base_glucose = random.uniform(120, 160)
elif hour >= 18 and hour <= 20: # Post dinner
base_glucose = random.uniform(110, 150)
else:
base_glucose = random.uniform(80, 120)
# Add moderate variability
glucose_value = base_glucose + random.uniform(-15, 15)
glucose_value = max(70, min(250, glucose_value))
reading = {
"systemTime": timestamp.isoformat(),
"displayTime": timestamp.isoformat(),
"value": round(glucose_value, 1),
"trend": "flat",
"realtimeValue": round(glucose_value, 1),
"smoothedValue": round(glucose_value, 1)
}
readings.append(reading)
return readings
def _get_recent_readings_for_agent(self, count: int = 5) -> list:
"""Get recent readings formatted for agent context"""
if self.processed_glucose_data is None or self.processed_glucose_data.empty:
return []
try:
recent_df = self.processed_glucose_data.tail(count)
readings = []
for _, row in recent_df.iterrows():
display_time = row.get('displayTime') or row.get('systemTime')
glucose_value = row.get('value')
trend_value = row.get('trend', 'flat')
if pd.notna(display_time):
if isinstance(display_time, str):
time_str = display_time
else:
time_str = pd.to_datetime(display_time).isoformat()
else:
time_str = datetime.now().isoformat()
if pd.notna(glucose_value):
glucose_clean = self._safe_convert_for_json(glucose_value)
else:
glucose_clean = None
trend_clean = str(trend_value) if pd.notna(trend_value) else 'flat'
readings.append({
"time": time_str,
"glucose": glucose_clean,
"trend": trend_clean
})
return readings
except Exception as e:
logger.error(f"Error getting recent readings: {e}")
return []
def _safe_convert_for_json(self, obj):
"""Safely convert objects for JSON serialization"""
import numpy as np
if obj is None:
return None
elif isinstance(obj, (np.integer, np.int64, np.int32)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64, np.float32)):
if np.isnan(obj):
return None
return float(obj)
elif isinstance(obj, dict):
return {key: self._safe_convert_for_json(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._safe_convert_for_json(item) for item in obj]
elif isinstance(obj, pd.Timestamp):
return obj.isoformat()
else:
return obj
def _build_success_response(self) -> Dict[str, Any]:
"""Build a consistent success response"""
data_points = len(self.processed_glucose_data) if self.processed_glucose_data is not None else 0
avg_glucose = self.calculated_stats.get('average_glucose', 0)
time_in_range = self.calculated_stats.get('time_in_range_70_180', 0)
return {
"success": True,
"message": f"β
Successfully loaded data for {self.current_user.name}",
"user": asdict(self.current_user),
"data_points": data_points,
"stats": self.calculated_stats,
"data_source": self.data_source,
"summary": f"π {data_points} readings | Avg: {avg_glucose:.1f} mg/dL | TIR: {time_in_range:.1f}% | Source: {self.data_source}"
}
def validate_data_consistency(self) -> Dict[str, Any]:
"""Validate that all components are using consistent data"""
if not self.calculated_stats:
return {"valid": False, "message": "No data loaded"}
validation = {
"valid": True,
"data_source": self.data_source,
"data_age_minutes": int((datetime.now() - self.data_loaded_at).total_seconds() / 60) if self.data_loaded_at else None,
"total_readings": self.calculated_stats.get('total_readings', 0),
"time_in_range": self.calculated_stats.get('time_in_range_70_180', 0),
"average_glucose": self.calculated_stats.get('average_glucose', 0),
"user": self.current_user.name if self.current_user else None
}
logger.info(f"Data consistency check: {validation}")
return validation
# ADDITIONAL: Debug function to test the API connection as it was working before
def test_original_api_method():
"""Test the API exactly as it was working before unified data manager"""
from apifunctions import DexcomAPI, DEMO_USERS
print("π Testing API exactly as it was working before...")
api = DexcomAPI()
# Test with sarah_g7 as it was working before
user_key = "sarah_g7"
user = DEMO_USERS[user_key]
print(f"Testing with {user.name} ({user.username}) - NOW WITH UNSTABLE GLUCOSE")
try:
# Call simulate_demo_login exactly as before
access_token = api.simulate_demo_login(user_key)
print(f"β
Authentication: {bool(access_token)}")
if access_token:
# Call get_egv_data exactly as before
end_date = datetime.now()
start_date = end_date - timedelta(days=14)
egv_data = api.get_egv_data(
start_date.isoformat(),
end_date.isoformat()
)
print(f"β
EGV Data: {len(egv_data)} readings")
if egv_data:
print(f"β
SUCCESS! API is working as before (with Sarah's unstable patterns)")
sample = egv_data[0] if egv_data else {}
print(f"Sample reading: {sample}")
return True
else:
print("β οΈ API authenticated but returned no data")
return False
else:
print("β Authentication failed")
return False
except Exception as e:
print(f"β Error: {e}")
return False
if __name__ == "__main__":
# Test the original API method
test_original_api_method() |