test-app / src /dashboard_api.py
SmartHeal's picture
Upload 16 files
00c982c verified
from flask import Flask, jsonify, request
from flask_cors import CORS
import logging
from datetime import datetime, timedelta
import json
from typing import Dict, Any, List, Optional
import threading
import time
from .dashboard_database_manager import DashboardDatabaseManager
class DashboardAPI:
"""API layer for dashboard integration"""
def __init__(self, database_manager: DashboardDatabaseManager, port: int = 5001):
self.database_manager = database_manager
self.port = port
self.app = Flask(__name__)
CORS(self.app) # Enable CORS for all routes
# Configure logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# Setup routes
self._setup_routes()
# Background thread for API server
self.api_thread = None
self.running = False
def _setup_routes(self):
"""Setup API routes for dashboard integration"""
@self.app.route('/api/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'service': 'SmartHeal Bot API'
})
@self.app.route('/api/bot/analytics', methods=['GET'])
def get_bot_analytics():
"""Get comprehensive bot analytics for dashboard"""
try:
analytics_data = self.database_manager.get_analytics_data()
# Add trend data for charts
analytics_data.update(self._get_trend_data())
return jsonify({
'success': True,
'data': analytics_data,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
self.logger.error(f"Error getting bot analytics: {e}")
return jsonify({
'success': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}), 500
@self.app.route('/api/bot/analytics/details/<int:analysis_id>', methods=['GET'])
def get_analysis_details(analysis_id):
"""Get detailed analysis information"""
try:
query = "SELECT * FROM ai_analyses WHERE id = %s"
analysis = self.database_manager.execute_query_one(query, (analysis_id,))
if not analysis:
return jsonify({
'success': False,
'error': 'Analysis not found'
}), 404
# Convert datetime objects to strings for JSON serialization
if analysis.get('created_at'):
analysis['created_at'] = analysis['created_at'].isoformat()
return jsonify({
'success': True,
'data': analysis,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
self.logger.error(f"Error getting analysis details: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/bot/interactions', methods=['GET'])
def get_bot_interactions():
"""Get bot interaction history"""
try:
limit = request.args.get('limit', 50, type=int)
interactions = self.database_manager.get_interaction_history(limit)
# Convert datetime objects for JSON serialization
for interaction in interactions:
if interaction.get('interacted_at'):
interaction['interacted_at'] = interaction['interacted_at'].isoformat()
return jsonify({
'success': True,
'data': interactions,
'count': len(interactions),
'timestamp': datetime.now().isoformat()
})
except Exception as e:
self.logger.error(f"Error getting bot interactions: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/bot/sessions', methods=['GET'])
def get_session_analytics():
"""Get session analytics"""
try:
session_data = self.database_manager.get_session_analytics()
return jsonify({
'success': True,
'data': session_data,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
self.logger.error(f"Error getting session analytics: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/bot/stats/summary', methods=['GET'])
def get_summary_stats():
"""Get summary statistics for dashboard widgets"""
try:
# Get basic counts
total_analyses = self.database_manager.execute_query_one("SELECT COUNT(*) as count FROM ai_analyses")
total_patients = self.database_manager.execute_query_one("SELECT COUNT(DISTINCT patient_id) as count FROM bot_interactions WHERE patient_id IS NOT NULL")
total_sessions = self.database_manager.execute_query_one("SELECT COUNT(*) as count FROM analysis_sessions")
# Get today's activity
today_analyses = self.database_manager.execute_query_one("""
SELECT COUNT(*) as count FROM ai_analyses
WHERE DATE(created_at) = CURDATE()
""")
# Get average metrics
avg_processing_time = self.database_manager.execute_query_one("""
SELECT AVG(processing_time) as avg_time FROM ai_analyses
WHERE processing_time IS NOT NULL
""")
avg_risk_score = self.database_manager.execute_query_one("""
SELECT AVG(risk_score) as avg_risk FROM ai_analyses
WHERE risk_score IS NOT NULL
""")
summary = {
'total_analyses': total_analyses['count'] if total_analyses else 0,
'total_patients': total_patients['count'] if total_patients else 0,
'total_sessions': total_sessions['count'] if total_sessions else 0,
'today_analyses': today_analyses['count'] if today_analyses else 0,
'avg_processing_time': round(avg_processing_time['avg_time'], 2) if avg_processing_time and avg_processing_time['avg_time'] else 0,
'avg_risk_score': round(avg_risk_score['avg_risk'], 1) if avg_risk_score and avg_risk_score['avg_risk'] else 0
}
return jsonify({
'success': True,
'data': summary,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
self.logger.error(f"Error getting summary stats: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/bot/models/performance', methods=['GET'])
def get_model_performance():
"""Get AI model performance metrics"""
try:
performance_data = self.database_manager.execute_query("""
SELECT
model_version,
COUNT(*) as total_analyses,
AVG(processing_time) as avg_processing_time,
AVG(risk_score) as avg_risk_score,
MIN(created_at) as first_used,
MAX(created_at) as last_used
FROM ai_analyses
WHERE model_version IS NOT NULL
GROUP BY model_version
ORDER BY last_used DESC
""", fetch=True)
# Convert datetime objects for JSON serialization
for model in performance_data:
if model.get('first_used'):
model['first_used'] = model['first_used'].isoformat()
if model.get('last_used'):
model['last_used'] = model['last_used'].isoformat()
if model.get('avg_processing_time'):
model['avg_processing_time'] = round(model['avg_processing_time'], 2)
if model.get('avg_risk_score'):
model['avg_risk_score'] = round(model['avg_risk_score'], 1)
return jsonify({
'success': True,
'data': performance_data or [],
'timestamp': datetime.now().isoformat()
})
except Exception as e:
self.logger.error(f"Error getting model performance: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
def _get_trend_data(self) -> Dict[str, Any]:
"""Get trend data for dashboard charts"""
try:
# Get last 30 days of analysis data
trend_data = self.database_manager.execute_query("""
SELECT
DATE(created_at) as analysis_date,
COUNT(*) as count
FROM ai_analyses
WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL 30 DAY)
GROUP BY DATE(created_at)
ORDER BY analysis_date
""", fetch=True)
# Prepare data for Chart.js
labels = []
data = []
if trend_data:
for row in trend_data:
labels.append(row['analysis_date'].strftime('%Y-%m-%d'))
data.append(row['count'])
# Get risk level distribution
risk_distribution = self.database_manager.execute_query("""
SELECT risk_level, COUNT(*) as count
FROM ai_analyses
GROUP BY risk_level
""", fetch=True)
risk_labels = []
risk_data = []
if risk_distribution:
for row in risk_distribution:
risk_labels.append(row['risk_level'])
risk_data.append(row['count'])
# Get processing time distribution
processing_time_data = self.database_manager.execute_query("""
SELECT
CASE
WHEN processing_time < 1 THEN '< 1s'
WHEN processing_time < 2 THEN '1-2s'
WHEN processing_time < 5 THEN '2-5s'
WHEN processing_time < 10 THEN '5-10s'
ELSE '> 10s'
END as time_range,
COUNT(*) as count
FROM ai_analyses
WHERE processing_time IS NOT NULL
GROUP BY time_range
ORDER BY
CASE
WHEN processing_time < 1 THEN 1
WHEN processing_time < 2 THEN 2
WHEN processing_time < 5 THEN 3
WHEN processing_time < 10 THEN 4
ELSE 5
END
""", fetch=True)
processing_labels = []
processing_data = []
if processing_time_data:
for row in processing_time_data:
processing_labels.append(row['time_range'])
processing_data.append(row['count'])
return {
'trend_labels': labels,
'trend_data': data,
'risk_level_labels': risk_labels,
'risk_level_data': risk_data,
'processing_time_labels': processing_labels,
'processing_time_data': processing_data
}
except Exception as e:
self.logger.error(f"Error getting trend data: {e}")
return {
'trend_labels': [],
'trend_data': [],
'risk_level_labels': [],
'risk_level_data': [],
'processing_time_labels': [],
'processing_time_data': []
}
def start_api_server(self):
"""Start the API server in a background thread"""
if self.running:
self.logger.warning("API server is already running")
return
def run_server():
try:
self.logger.info(f"Starting SmartHeal Bot API server on port {self.port}")
self.app.run(host='0.0.0.0', port=self.port, debug=False, threaded=True)
except Exception as e:
self.logger.error(f"Error starting API server: {e}")
self.running = True
self.api_thread = threading.Thread(target=run_server, daemon=True)
self.api_thread.start()
# Give the server a moment to start
time.sleep(1)
self.logger.info(f"βœ… SmartHeal Bot API server started on http://0.0.0.0:{self.port}")
def stop_api_server(self):
"""Stop the API server"""
self.running = False
if self.api_thread and self.api_thread.is_alive():
self.logger.info("Stopping SmartHeal Bot API server")
# Note: Flask development server doesn't have a clean shutdown method
# In production, you would use a proper WSGI server like Gunicorn
def is_running(self) -> bool:
"""Check if the API server is running"""
return self.running and self.api_thread and self.api_thread.is_alive()
class DashboardIntegrationManager:
"""Manager class for dashboard integration functionality"""
def __init__(self, database_manager: DashboardDatabaseManager):
self.database_manager = database_manager
self.api = DashboardAPI(database_manager)
self.logger = logging.getLogger(__name__)
def start_integration(self):
"""Start dashboard integration services"""
try:
self.api.start_api_server()
self.logger.info("βœ… Dashboard integration started successfully")
except Exception as e:
self.logger.error(f"❌ Failed to start dashboard integration: {e}")
def stop_integration(self):
"""Stop dashboard integration services"""
try:
self.api.stop_api_server()
self.logger.info("βœ… Dashboard integration stopped")
except Exception as e:
self.logger.error(f"❌ Error stopping dashboard integration: {e}")
def log_analysis_session(self, session_data: Dict[str, Any]) -> Optional[int]:
"""Log an analysis session for dashboard tracking"""
try:
session_id = self.database_manager.save_analysis_session(session_data)
if session_id:
self.logger.info(f"βœ… Analysis session logged with ID: {session_id}")
return session_id
except Exception as e:
self.logger.error(f"❌ Error logging analysis session: {e}")
return None
def log_bot_interaction(self, interaction_data: Dict[str, Any]) -> Optional[int]:
"""Log a bot interaction for dashboard tracking"""
try:
interaction_id = self.database_manager.save_bot_interaction(interaction_data)
if interaction_id:
self.logger.info(f"βœ… Bot interaction logged with ID: {interaction_id}")
return interaction_id
except Exception as e:
self.logger.error(f"❌ Error logging bot interaction: {e}")
return None
def get_integration_status(self) -> Dict[str, Any]:
"""Get the status of dashboard integration"""
return {
'api_running': self.api.is_running(),
'database_connected': self.database_manager.get_connection() is not None,
'timestamp': datetime.now().isoformat()
}