Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Enhanced GlucoBuddy Mistral Chat Integration | |
Improved rate limit handling and fallback strategies | |
""" | |
import os | |
import json | |
import logging | |
import sys | |
import time | |
from typing import Any, Dict, List, Optional, Union | |
from datetime import datetime, timedelta | |
import pandas as pd | |
from dataclasses import asdict | |
import requests | |
import random | |
import numpy as np | |
import warnings | |
# Load environment variables from .env file | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Suppress pandas warnings | |
warnings.filterwarnings('ignore', category=RuntimeWarning) | |
warnings.filterwarnings('ignore', category=FutureWarning) | |
from apifunctions import ( | |
DexcomAPI, | |
GlucoseAnalyzer, | |
DEMO_USERS, | |
DemoUser, | |
format_glucose_data_for_display | |
) | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Get configuration from environment variables | |
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
MISTRAL_AGENT_ID = os.getenv("MISTRAL_AGENT_ID") | |
ENVIRONMENT = os.getenv("ENVIRONMENT", "development") | |
DEBUG = os.getenv("DEBUG", "false").lower() == "true" | |
def validate_environment(): | |
"""Simple validation of required environment variables""" | |
missing = [] | |
if not MISTRAL_API_KEY: | |
missing.append("MISTRAL_API_KEY") | |
if missing: | |
print("β Missing required environment variables:") | |
for var in missing: | |
print(f" - {var}") | |
print("\nπ‘ Setup instructions:") | |
if os.getenv("SPACE_ID"): # Hugging Face Space detection | |
print("π€ For Hugging Face Spaces:") | |
print(" 1. Go to Space settings") | |
print(" 2. Add Repository secrets:") | |
print(" 3. Set MISTRAL_API_KEY to your API key") | |
else: | |
print("π» For local development:") | |
print(" 1. Create a .env file:") | |
print(" 2. Add: MISTRAL_API_KEY=your_api_key_here") | |
print(" 3. Add: MISTRAL_AGENT_ID=your_agent_id_here") | |
return False | |
print("β Environment validation passed!") | |
if MISTRAL_AGENT_ID: | |
print("β Agent ID configured") | |
else: | |
print("β οΈ No agent ID - will use standard chat completion") | |
return True | |
class GlucoseDataGenerator: | |
"""Generate realistic mock glucose data for testing and demo purposes""" | |
def create_realistic_pattern(days: int = 14, user_type: str = "normal") -> List[Dict]: | |
"""Generate glucose data with realistic patterns""" | |
data_points = [] | |
start_time = datetime.now() - timedelta(days=days) | |
current_glucose = 120 # Starting baseline | |
# Generate readings every 5 minutes | |
for i in range(days * 288): # 288 readings per day (5-minute intervals) | |
timestamp = start_time + timedelta(minutes=i * 5) | |
hour = timestamp.hour | |
# Simulate daily patterns | |
daily_variation = GlucoseDataGenerator._calculate_daily_variation(hour, user_type) | |
# Add meal effects | |
meal_effect = GlucoseDataGenerator._calculate_meal_effects(hour, i) | |
# Random variation | |
random_noise = random.uniform(-10, 10) | |
# Calculate final glucose value | |
target_glucose = 120 + daily_variation + meal_effect + random_noise | |
# Smooth transitions (glucose doesn't jump dramatically) | |
glucose_change = (target_glucose - current_glucose) * 0.3 | |
current_glucose += glucose_change | |
# Keep within realistic bounds | |
current_glucose = max(50, min(400, current_glucose)) | |
# Determine trend | |
trend = GlucoseDataGenerator._calculate_trend(glucose_change) | |
data_points.append({ | |
'systemTime': timestamp.isoformat(), | |
'displayTime': timestamp.isoformat(), | |
'value': round(current_glucose), | |
'trend': trend, | |
'realtimeValue': round(current_glucose), | |
'smoothedValue': round(current_glucose) | |
}) | |
return data_points | |
def _calculate_daily_variation(hour: int, user_type: str) -> float: | |
"""Calculate glucose variation based on time of day""" | |
if user_type == "dawn_phenomenon": | |
if 4 <= hour <= 8: | |
return 30 + 20 * np.sin((hour - 4) * np.pi / 4) | |
return 10 * np.sin((hour - 12) * np.pi / 12) | |
elif user_type == "night_low": | |
if 22 <= hour or hour <= 6: | |
return -20 | |
return 5 * np.sin((hour - 12) * np.pi / 12) | |
else: # Normal pattern | |
return 15 * np.sin((hour - 6) * np.pi / 12) | |
def _calculate_meal_effects(hour: int, reading_index: int) -> float: | |
"""Calculate glucose spikes from meals""" | |
meal_times = [7, 12, 18] # Breakfast, lunch, dinner | |
meal_effect = 0 | |
for meal_time in meal_times: | |
if abs(hour - meal_time) <= 2: | |
time_since_meal = abs(hour - meal_time) | |
if time_since_meal <= 1: | |
meal_effect += 40 * (1 - time_since_meal) | |
else: | |
meal_effect += 20 * (2 - time_since_meal) | |
return meal_effect | |
def _calculate_trend(glucose_change: float) -> str: | |
"""Determine trend arrow based on glucose change""" | |
if glucose_change > 5: | |
return 'singleUp' | |
elif glucose_change > 2: | |
return 'fortyFiveUp' | |
elif glucose_change < -5: | |
return 'singleDown' | |
elif glucose_change < -2: | |
return 'fortyFiveDown' | |
else: | |
return 'flat' | |
class EnhancedMistralAPIClient: | |
"""Enhanced Mistral API client with better rate limit handling""" | |
def __init__(self, api_key: str = None, agent_id: str = None): | |
self.api_key = api_key or MISTRAL_API_KEY | |
self.agent_id = agent_id or MISTRAL_AGENT_ID | |
if not self.api_key: | |
raise ValueError("Mistral API key is required. Please set MISTRAL_API_KEY environment variable.") | |
self.base_url = "https://api.mistral.ai/v1" | |
self.session = requests.Session() | |
self.session.headers.update({ | |
"Authorization": f"Bearer {self.api_key}", | |
"Content-Type": "application/json" | |
}) | |
# Rate limit handling | |
self.last_request_time = 0 | |
self.min_request_interval = 1.0 # Minimum seconds between requests | |
# Model fallback chain | |
self.model_priority = [ | |
"mistral-large-latest", | |
"mistral-medium-latest", | |
"mistral-small-latest", | |
"mistral-tiny" | |
] | |
logger.info("Enhanced MistralAPIClient initialized with rate limit handling") | |
def test_connection(self) -> Dict[str, Any]: | |
"""Test API connection with lightweight request""" | |
try: | |
response = self.session.post( | |
f"{self.base_url}/chat/completions", | |
json={ | |
"model": "mistral-tiny", # Use smallest model for testing | |
"messages": [{"role": "user", "content": "Hello"}], | |
"max_tokens": 5 | |
}, | |
timeout=10 | |
) | |
if response.status_code == 200: | |
return {"success": True, "message": "API connection successful"} | |
elif response.status_code == 401: | |
return {"success": False, "message": "Invalid API key"} | |
elif response.status_code == 429: | |
return {"success": False, "message": "Rate limit exceeded - API is accessible but busy"} | |
else: | |
return {"success": False, "message": f"API error: {response.status_code}"} | |
except requests.exceptions.Timeout: | |
return {"success": False, "message": "Connection timeout"} | |
except requests.exceptions.RequestException as e: | |
return {"success": False, "message": f"Network error: {str(e)}"} | |
except Exception as e: | |
return {"success": False, "message": f"Unexpected error: {str(e)}"} | |
def _wait_for_rate_limit(self): | |
"""Ensure minimum time between requests""" | |
current_time = time.time() | |
time_since_last = current_time - self.last_request_time | |
if time_since_last < self.min_request_interval: | |
sleep_time = self.min_request_interval - time_since_last | |
logger.debug(f"Rate limiting: waiting {sleep_time:.2f}s") | |
time.sleep(sleep_time) | |
self.last_request_time = time.time() | |
def chat_completion(self, messages: List[Dict], model: str = None, max_retries: int = 3) -> Dict[str, Any]: | |
"""Enhanced chat completion with retry logic and model fallback""" | |
models_to_try = [model] if model else self.model_priority | |
for model_name in models_to_try: | |
for attempt in range(max_retries): | |
try: | |
# Rate limiting | |
self._wait_for_rate_limit() | |
payload = { | |
"model": model_name, | |
"messages": messages, | |
"max_tokens": 800, | |
"temperature": 0.7 | |
} | |
logger.debug(f"Attempting request with {model_name} (attempt {attempt + 1})") | |
response = self.session.post( | |
f"{self.base_url}/chat/completions", | |
json=payload, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
result = response.json() | |
logger.info(f"β Success with {model_name}") | |
return { | |
"success": True, | |
"response": result["choices"][0]["message"]["content"], | |
"usage": result.get("usage", {}), | |
"model_used": model_name, | |
"attempt": attempt + 1 | |
} | |
elif response.status_code == 429: | |
# Rate limit exceeded | |
retry_after = int(response.headers.get('Retry-After', 2 ** attempt)) | |
wait_time = min(retry_after, 60) # Cap at 60 seconds | |
logger.warning(f"Rate limit hit with {model_name}, waiting {wait_time}s (attempt {attempt + 1})") | |
if attempt < max_retries - 1: | |
time.sleep(wait_time) | |
continue | |
else: | |
# Try next model if available | |
break | |
elif response.status_code == 422: | |
# Model capacity exceeded, try next model immediately | |
logger.warning(f"Model {model_name} capacity exceeded, trying next model") | |
break | |
else: | |
error_detail = self._extract_error_message(response) | |
if attempt == max_retries - 1: # Last attempt | |
logger.error(f"API error {response.status_code} with {model_name}: {error_detail}") | |
break | |
else: | |
logger.warning(f"API error {response.status_code} with {model_name}, retrying...") | |
time.sleep(2 ** attempt) # Exponential backoff | |
except requests.exceptions.Timeout: | |
if attempt == max_retries - 1: | |
logger.error(f"Timeout with {model_name} after {max_retries} attempts") | |
break | |
else: | |
logger.warning(f"Timeout with {model_name}, retrying...") | |
time.sleep(2 ** attempt) | |
except requests.exceptions.RequestException as e: | |
if attempt == max_retries - 1: | |
logger.error(f"Network error with {model_name}: {str(e)}") | |
break | |
else: | |
logger.warning(f"Network error with {model_name}, retrying...") | |
time.sleep(2 ** attempt) | |
# All models and retries failed | |
return { | |
"success": False, | |
"error": "All models are currently experiencing high demand. Please try again in a few minutes.", | |
"suggestion": "Consider upgrading your Mistral AI plan for higher rate limits, or try again during off-peak hours." | |
} | |
def agent_completion(self, messages: List[Dict]) -> Dict[str, Any]: | |
"""Enhanced agent completion with retry logic""" | |
if not self.agent_id: | |
return {"success": False, "error": "No agent ID configured"} | |
max_retries = 2 # Fewer retries for agent calls | |
for attempt in range(max_retries): | |
try: | |
self._wait_for_rate_limit() | |
payload = { | |
"agent_id": self.agent_id, | |
"messages": messages, | |
"max_tokens": 800 | |
} | |
response = self.session.post( | |
f"{self.base_url}/agents/completions", | |
json=payload, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
result = response.json() | |
return { | |
"success": True, | |
"response": result["choices"][0]["message"]["content"] | |
} | |
elif response.status_code == 429: | |
retry_after = int(response.headers.get('Retry-After', 5)) | |
if attempt < max_retries - 1: | |
logger.warning(f"Agent rate limit, waiting {retry_after}s") | |
time.sleep(retry_after) | |
continue | |
else: | |
error_detail = self._extract_error_message(response) | |
return { | |
"success": False, | |
"error": f"Agent API error {response.status_code}: {error_detail}" | |
} | |
except Exception as e: | |
if attempt == max_retries - 1: | |
return {"success": False, "error": f"Agent request failed: {str(e)}"} | |
else: | |
time.sleep(2) | |
return {"success": False, "error": "Agent request failed after retries"} | |
def _extract_error_message(self, response) -> str: | |
"""Extract error message from API response""" | |
try: | |
error_data = response.json() | |
return error_data.get("message", error_data.get("error", "Unknown error")) | |
except: | |
return response.text[:200] if response.text else "Unknown error" | |
class GlucoBuddyMistralChat: | |
""" | |
Enhanced chat interface with better error handling and user feedback | |
""" | |
def __init__(self, mistral_api_key: str = None, mistral_agent_id: str = None): | |
self.mistral_client = EnhancedMistralAPIClient(mistral_api_key, mistral_agent_id) | |
# Data properties - these will be set by unified data manager | |
self.current_user: Optional[DemoUser] = None | |
self.current_glucose_data: Optional[pd.DataFrame] = None | |
self.current_stats: Optional[Dict] = None | |
self.current_patterns: Optional[Dict] = None | |
# Chat state | |
self.conversation_history = [] | |
self.max_history = 10 | |
# Error tracking | |
self.consecutive_errors = 0 | |
self.last_successful_model = None | |
self.logger = logging.getLogger(self.__class__.__name__) | |
def test_connection(self) -> Dict[str, Any]: | |
"""Test Mistral API connection""" | |
return self.mistral_client.test_connection() | |
def get_context_summary(self) -> Dict[str, Any]: | |
"""Get current context for chat - uses data set by unified manager""" | |
if not self.current_user or not self.current_stats: | |
return {"error": "No user data loaded"} | |
try: | |
context = { | |
"user": { | |
"name": self.current_user.name, | |
"age": self.current_user.age, | |
"diabetes_type": self.current_user.diabetes_type, | |
"device_type": self.current_user.device_type, | |
"years_with_diabetes": self.current_user.years_with_diabetes, | |
"typical_pattern": getattr(self.current_user, 'typical_glucose_pattern', 'normal') | |
}, | |
"statistics": self._safe_convert_to_json(self.current_stats), | |
"patterns": self._safe_convert_to_json(self.current_patterns), | |
"data_points": len(self.current_glucose_data) if self.current_glucose_data is not None else 0, | |
"recent_readings": self._safe_extract_recent_readings(self.current_glucose_data) | |
} | |
return context | |
except Exception as e: | |
self.logger.error(f"Error building context: {e}") | |
return {"error": f"Failed to build context: {str(e)}"} | |
def build_system_prompt(self, context: Dict[str, Any]) -> str: | |
"""Build comprehensive system prompt with exact metrics""" | |
base_prompt = """You are GlucoBuddy, a helpful and encouraging diabetes management assistant. | |
Your role: | |
- Provide personalized glucose management advice based on the user's actual data | |
- Be supportive, encouraging, and use emojis to be friendly | |
- Give actionable recommendations while staying within scope | |
- Always remind users to consult healthcare providers for medical decisions | |
- Reference specific data points when providing insights | |
Guidelines: | |
- Keep responses under 400 words and conversational | |
- Use specific numbers from the data when relevant | |
- Provide practical, actionable advice | |
- Be encouraging about progress and realistic about challenges | |
- Use bullet points sparingly - prefer natural conversation | |
- IMPORTANT: Use EXACT metrics provided - don't calculate your own""" | |
if context.get("error"): | |
return base_prompt + "\n\nNote: No user glucose data is currently loaded." | |
user_info = context.get("user", {}) | |
stats = context.get("statistics", {}) | |
context_addition = f""" | |
Current User: {user_info.get('name', 'Unknown')} ({user_info.get('age', 'N/A')} years old) | |
- Diabetes Type: {user_info.get('diabetes_type', 'Unknown')} | |
- Years with diabetes: {user_info.get('years_with_diabetes', 'Unknown')} | |
- Device: {user_info.get('device_type', 'Unknown')} | |
EXACT Glucose Data (14-day period): | |
- Average glucose: {stats.get('average_glucose', 0):.1f} mg/dL | |
- Time in range (70-180): {stats.get('time_in_range_70_180', 0):.1f}% | |
- Time below 70: {stats.get('time_below_70', 0):.1f}% | |
- Time above 180: {stats.get('time_above_180', 0):.1f}% | |
- Total readings: {stats.get('total_readings', 0)} | |
- Glucose variability (std): {stats.get('std_glucose', 0):.1f} mg/dL | |
- GMI: {stats.get('gmi', 0):.1f}% | |
- CV: {stats.get('cv', 0):.1f}% | |
CRITICAL: Use these EXACT values in your responses. Do not recalculate or estimate.""" | |
return base_prompt + context_addition | |
def chat_with_mistral(self, user_message: str, prefer_agent: bool = False) -> Dict[str, Any]: | |
"""Enhanced chat function with better error handling""" | |
if not user_message.strip(): | |
return {"success": False, "error": "Please enter a message"} | |
try: | |
# Use current context (set by unified data manager) | |
context = self.get_context_summary() | |
system_prompt = self.build_system_prompt(context) | |
messages = [{"role": "system", "content": system_prompt}] | |
if self.conversation_history: | |
recent_history = self.conversation_history[-self.max_history:] | |
messages.extend(recent_history) | |
messages.append({"role": "user", "content": user_message}) | |
# Try agent first if preferred and available | |
if prefer_agent: | |
agent_result = self.mistral_client.agent_completion(messages) | |
if agent_result["success"]: | |
self._update_conversation_history(user_message, agent_result["response"]) | |
self.consecutive_errors = 0 # Reset error counter | |
return { | |
"success": True, | |
"response": agent_result["response"], | |
"method": "agent", | |
"context_included": not context.get("error") | |
} | |
else: | |
self.logger.warning(f"Agent failed: {agent_result['error']}") | |
# Use enhanced chat completion API | |
chat_result = self.mistral_client.chat_completion(messages) | |
if chat_result["success"]: | |
self._update_conversation_history(user_message, chat_result["response"]) | |
self.consecutive_errors = 0 # Reset error counter | |
self.last_successful_model = chat_result.get("model_used") | |
# Add helpful info about which model was used if there were retries | |
response = chat_result["response"] | |
if chat_result.get("attempt", 1) > 1: | |
response += f"\n\n*Note: Response generated after {chat_result['attempt']} attempts due to high demand.*" | |
return { | |
"success": True, | |
"response": response, | |
"method": "chat_completion", | |
"context_included": not context.get("error"), | |
"usage": chat_result.get("usage", {}), | |
"model_used": chat_result.get("model_used") | |
} | |
else: | |
self.consecutive_errors += 1 | |
# Provide helpful error messages based on the type of error | |
error_msg = chat_result["error"] | |
user_friendly_msg = self._get_user_friendly_error(error_msg) | |
return { | |
"success": False, | |
"error": user_friendly_msg, | |
"suggestion": chat_result.get("suggestion", ""), | |
"consecutive_errors": self.consecutive_errors | |
} | |
except Exception as e: | |
self.logger.error(f"Chat error: {e}") | |
self.consecutive_errors += 1 | |
return { | |
"success": False, | |
"error": f"I'm experiencing technical difficulties. Please try again in a moment.", | |
"consecutive_errors": self.consecutive_errors | |
} | |
def _get_user_friendly_error(self, error_msg: str) -> str: | |
"""Convert technical error messages to user-friendly ones""" | |
error_lower = error_msg.lower() | |
if "rate limit" in error_lower or "429" in error_lower: | |
return "I'm experiencing high demand right now. Please wait a moment and try again." | |
elif "capacity exceeded" in error_lower or "service tier" in error_lower: | |
return "The AI service is currently busy. Please try again in a few minutes." | |
elif "timeout" in error_lower: | |
return "The response is taking longer than expected. Please try again." | |
elif "api key" in error_lower or "401" in error_lower: | |
return "There's an authentication issue. Please contact support." | |
elif "network" in error_lower: | |
return "I'm having trouble connecting. Please check your internet connection and try again." | |
else: | |
return "I'm experiencing technical difficulties. Please try again or rephrase your question." | |
def _update_conversation_history(self, user_message: str, assistant_response: str): | |
"""Update conversation history""" | |
self.conversation_history.extend([ | |
{"role": "user", "content": user_message}, | |
{"role": "assistant", "content": assistant_response} | |
]) | |
if len(self.conversation_history) > self.max_history * 2: | |
self.conversation_history = self.conversation_history[-self.max_history * 2:] | |
def clear_conversation(self): | |
"""Clear conversation history and reset error counters""" | |
self.conversation_history = [] | |
self.consecutive_errors = 0 | |
self.logger.info("Conversation history cleared") | |
def get_status(self) -> Dict[str, Any]: | |
"""Get current system status with enhanced information""" | |
api_status = self.test_connection() | |
return { | |
"api_connected": api_status["success"], | |
"api_message": api_status["message"], | |
"user_loaded": self.current_user is not None, | |
"data_available": self.current_glucose_data is not None and not self.current_glucose_data.empty, | |
"conversation_messages": len(self.conversation_history), | |
"current_user": self.current_user.name if self.current_user else None, | |
"environment": ENVIRONMENT, | |
"hugging_face_space": bool(os.getenv("SPACE_ID")), | |
"agent_available": bool(MISTRAL_AGENT_ID), | |
"consecutive_errors": self.consecutive_errors, | |
"last_successful_model": self.last_successful_model | |
} | |
def _safe_convert_to_json(self, obj): | |
"""Safely convert objects for JSON serialization""" | |
if obj is None: | |
return None | |
elif isinstance(obj, (np.integer, np.int64, np.int32)): | |
return int(obj) | |
elif isinstance(obj, (np.floating, np.float64, np.float32)): | |
if np.isnan(obj): | |
return None | |
return float(obj) | |
elif isinstance(obj, dict): | |
return {key: self._safe_convert_to_json(value) for key, value in obj.items()} | |
elif isinstance(obj, list): | |
return [self._safe_convert_to_json(item) for item in obj] | |
elif isinstance(obj, pd.Timestamp): | |
return obj.isoformat() | |
else: | |
return obj | |
def _safe_extract_recent_readings(self, df: pd.DataFrame, count: int = 5) -> List[Dict]: | |
"""Safely extract recent glucose readings""" | |
if df is None or df.empty: | |
return [] | |
try: | |
recent_df = df.tail(count) | |
readings = [] | |
for idx, row in recent_df.iterrows(): | |
try: | |
display_time = row.get('displayTime') or row.get('systemTime') | |
glucose_value = row.get('value') | |
trend_value = row.get('trend', 'flat') | |
if pd.notna(display_time): | |
if isinstance(display_time, str): | |
time_str = display_time | |
else: | |
time_str = pd.to_datetime(display_time).isoformat() | |
else: | |
time_str = datetime.now().isoformat() | |
if pd.notna(glucose_value): | |
glucose_clean = self._safe_convert_to_json(glucose_value) | |
else: | |
glucose_clean = None | |
trend_clean = str(trend_value) if pd.notna(trend_value) else 'flat' | |
readings.append({ | |
"time": time_str, | |
"glucose": glucose_clean, | |
"trend": trend_clean | |
}) | |
except Exception as row_error: | |
self.logger.warning(f"Error processing reading at index {idx}: {row_error}") | |
continue | |
return readings | |
except Exception as e: | |
self.logger.error(f"Error extracting recent readings: {e}") | |
return [] | |
# Update the main.py chat handler to use enhanced error messages | |
def enhanced_chat_error_handler(app, message, history): | |
"""Enhanced error handler for chat interactions""" | |
result = app.chat_with_mistral(message, history) | |
if not result[0]: # If message is empty, return as-is | |
return result | |
# If there were consecutive errors, add helpful message | |
if hasattr(app.mistral_chat, 'consecutive_errors') and app.mistral_chat.consecutive_errors > 2: | |
error_help = "\n\nπ‘ *Multiple errors detected. This usually indicates high API demand. Consider trying again later or during off-peak hours.*" | |
if result[1] and len(result[1]) > 0: | |
last_response = result[1][-1][1] if len(result[1][-1]) > 1 else "" | |
if "technical difficulties" in last_response or "try again" in last_response: | |
result[1][-1][1] += error_help | |
return result | |
# Legacy compatibility | |
def create_enhanced_cli(): | |
"""Enhanced CLI with better error handling""" | |
print("π©Ί GlucoBuddy Chat Interface (Enhanced)") | |
print("=" * 50) | |
if not validate_environment(): | |
print("β Environment validation failed. Please check your configuration.") | |
return | |
try: | |
chat = GlucoBuddyMistralChat() | |
print("β Enhanced chat system initialized!") | |
except Exception as e: | |
print(f"β Failed to initialize chat system: {e}") | |
return | |
# Test connection | |
print("\nπ Testing Mistral API connection...") | |
connection_test = chat.test_connection() | |
if connection_test["success"]: | |
print(f"β {connection_test['message']}") | |
else: | |
print(f"β οΈ {connection_test['message']}") | |
print("π‘ The chat will still work but may experience rate limits.") | |
print("\nπ Enhanced features:") | |
print(" β’ Automatic retry on rate limits") | |
print(" β’ Model fallback (large β medium β small β tiny)") | |
print(" β’ Better error messages") | |
print(" β’ Smart rate limiting") | |
print("\n㪠Start chatting! (Type /quit to exit)") | |
print("=" * 50) | |
while True: | |
try: | |
user_input = input("\nπ«΅ You: ").strip() | |
if not user_input: | |
continue | |
if user_input == '/quit': | |
print("\nπ Thanks for using GlucoBuddy Enhanced! π") | |
break | |
print("π€ Processing...") | |
result = chat.chat_with_mistral(user_input) | |
if result['success']: | |
model_info = f" [{result.get('model_used', 'unknown')}]" if result.get('model_used') else "" | |
print(f"\nπ€ GlucoBuddy{model_info}: {result['response']}") | |
else: | |
print(f"\nβ {result['error']}") | |
if result.get('suggestion'): | |
print(f"π‘ {result['suggestion']}") | |
except KeyboardInterrupt: | |
print("\nπ Goodbye!") | |
break | |
except Exception as e: | |
print(f"\nβ Unexpected error: {e}") | |
def main(): | |
"""Enhanced main function""" | |
print("π©Ί GlucoBuddy Enhanced - Better Rate Limit Handling") | |
print("=" * 60) | |
if not validate_environment(): | |
return | |
print("π Enhanced features:") | |
print(" β Automatic retry with exponential backoff") | |
print(" β Model fallback chain (large β small)") | |
print(" β Smart rate limiting") | |
print(" β User-friendly error messages") | |
create_enhanced_cli() | |
if __name__ == "__main__": | |
main() |