tutor-api / main.py
rairo's picture
Update main.py
f8558c4 verified
# ---- Main Application File: main.py ----
import io
import uuid
import re
import time
import tempfile
import requests
import json
import os
import logging
import traceback
from datetime import datetime
from pathlib import Path
import urllib.parse
from flask import Flask, request, jsonify, send_file, Response
from flask_cors import CORS
from supabase import create_client, Client
# --- Input Processing & AI Libraries ---
import google.generativeai as genai
from elevenlabs.client import ElevenLabs
from elevenlabs import save as save_elevenlabs_audio
from PyPDF2 import PdfReader
import wikipedia
from youtube_transcript_api import YouTubeTranscriptApi
import arxiv # For ArXiv
from elevenlabs import play, stream, save
import math
import pydub
import logging
import traceback
import uuid
from io import BytesIO # To handle in-memory bytes
# --- Environment Variables ---
# Load environment variables if using a .env file (optional, good practice)
from dotenv import load_dotenv
load_dotenv()
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") # Use service role key for admin-like backend tasks
SUPABASE_ANON_KEY = os.getenv("SUPABASE_ANON_KEY") # Use anon key for client-side actions if needed, but prefer service key for backend logic
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")
# --- Initialize Flask app and CORS ---
app = Flask(__name__)
CORS(app) # Allow all origins for simplicity in development
# --- Initialize Supabase Client ---
try:
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY:
raise ValueError("Supabase URL and Service Key must be set in environment variables.")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
print("Supabase client initialized successfully.")
# Example table check (optional)
# response = supabase.table('users').select("id", count='exact').limit(0).execute()
# print("Checked 'users' table connection.")
except Exception as e:
print(f"Error initializing Supabase client: {e}")
# Depending on your setup, you might want to exit or handle this differently
supabase = None # Indicate client is not available
# --- Initialize Gemini API ---
try:
if not GEMINI_API_KEY:
raise ValueError("Gemini API Key must be set in environment variables.")
genai.configure(api_key=GEMINI_API_KEY)
# Use a generally available model, adjust if you have access to specific previews
gemini_model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
print("Gemini API initialized successfully.")
except Exception as e:
print(f"Error initializing Gemini API: {e}")
gemini_model = None
# --- Initialize ElevenLabs Client ---
try:
if not ELEVENLABS_API_KEY:
raise ValueError("ElevenLabs API Key must be set in environment variables.")
elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY)
print("ElevenLabs client initialized successfully.")
# Optional: Check available voices
# voices = elevenlabs_client.voices.get_all()
# print(f"Available ElevenLabs voices: {[v.name for v in voices.voices]}")
except Exception as e:
print(f"Error initializing ElevenLabs client: {e}")
elevenlabs_client = None
# --- Logging ---
LOG_FILE_PATH = "/tmp/ai_tutor.log" # Adjust path as needed
logging.basicConfig(filename=LOG_FILE_PATH, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# === Helper Functions ===
def verify_token(auth_header):
"""Verifies Supabase JWT token from Authorization header."""
if not supabase:
raise ConnectionError("Supabase client not initialized.")
if not auth_header or not auth_header.startswith('Bearer '):
return None, {'error': 'Missing or invalid Authorization header', 'status': 401}
token = auth_header.split(' ')[1]
try:
# Verify token and get user data
response = supabase.auth.get_user(token)
user = response.user
if not user:
return None, {'error': 'Invalid or expired token', 'status': 401}
# Optionally fetch profile data if needed immediately
# profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute()
return user, None
except Exception as e:
logging.error(f"Token verification error: {e}")
# Differentiate between specific Supabase errors if needed
return None, {'error': f'Token verification failed: {e}', 'status': 401}
def verify_admin(user):
"""Checks if the verified user is an admin."""
if not supabase:
raise ConnectionError("Supabase client not initialized.")
if not user:
return False, {'error': 'User not provided for admin check', 'status': 400}
try:
# Check the 'is_admin' flag in the 'profiles' table
profile_res = supabase.table('profiles').select('is_admin').eq('id', user.id).maybe_single().execute()
profile_data = profile_res.data
if profile_data and profile_data.get('is_admin'):
return True, None
else:
return False, {'error': 'Admin access required', 'status': 403} # 403 Forbidden
except Exception as e:
logging.error(f"Admin check failed for user {user.id}: {e}")
return False, {'error': f'Error checking admin status: {e}', 'status': 500}
def upload_to_supabase_storage(bucket_name: str, file_path: str, destination_path: str, content_type: str):
"""Uploads a local file to Supabase Storage."""
if not supabase:
raise ConnectionError("Supabase client not initialized.")
try:
with open(file_path, 'rb') as f:
# Use upsert=True to overwrite if file exists, adjust if needed
supabase.storage.from_(bucket_name).upload(
path=destination_path,
file=f,
file_options={"content-type": content_type, "cache-control": "3600", "upsert": "true"}
)
# Get the public URL (ensure bucket has public access enabled or use signed URLs)
res = supabase.storage.from_(bucket_name).get_public_url(destination_path)
return res
except Exception as e:
logging.error(f"Supabase Storage upload failed: {e}")
raise # Re-raise the exception to be handled by the caller
# === Input Content Extraction Helpers ===
def get_pdf_text(pdf_file_storage):
"""Extract text from a PDF file stream."""
text = ""
try:
pdf_reader = PdfReader(pdf_file_storage)
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
# Simple truncation (consider smarter chunking for very large PDFs)
MAX_CHARS = 300000 # Adjust as needed based on Gemini context limits
return text[:MAX_CHARS]
except Exception as e:
logging.error(f"Error reading PDF: {e}")
raise ValueError(f"Could not process PDF file: {e}")
def get_youtube_transcript(url):
"""Get transcript text from a YouTube URL."""
try:
if "v=" in url:
video_id = url.split("v=")[1].split("&")[0]
elif "youtu.be/" in url:
video_id = url.split("youtu.be/")[1].split("?")[0]
else:
raise ValueError("Invalid YouTube URL format.")
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = " ".join([item['text'] for item in transcript_list])
MAX_CHARS = 300000 # Adjust as needed
return transcript_text[:MAX_CHARS]
except Exception as e:
logging.error(f"Error getting YouTube transcript for {url}: {e}")
raise ValueError(f"Could not get transcript: {e}")
def get_wiki_content(url):
"""Get summary content from a Wikipedia URL."""
try:
# Extract title from URL (simple approach)
page_title = urllib.parse.unquote(url.rstrip("/").split("/")[-1]).replace("_", " ")
wikipedia.set_lang("en") # Or configure based on user preference
page = wikipedia.page(page_title, auto_suggest=False) # Be specific
content = page.content # Get full content, might be large
# Alternatively, use summary: content = page.summary
MAX_CHARS = 300000 # Adjust as needed
return content[:MAX_CHARS]
except wikipedia.exceptions.PageError:
raise ValueError(f"Wikipedia page '{page_title}' not found.")
except wikipedia.exceptions.DisambiguationError as e:
raise ValueError(f"'{page_title}' refers to multiple pages: {e.options}")
except Exception as e:
logging.error(f"Error getting Wikipedia content for {url}: {e}")
raise ValueError(f"Could not get Wikipedia content: {e}")
def fetch_bible_text(reference):
"""Fetch Bible text from an external API (example using bible-api.com)."""
# This API is simple but might have limitations. Consider alternatives if needed.
try:
# URL encode the reference
query = urllib.parse.quote(reference)
api_url = f"https://bible-api.com/{query}?translation=kjv" # King James Version, change if needed
response = requests.get(api_url, timeout=15) # Add timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
data = response.json()
# Check if 'text' exists, handle potential variations in API response
if 'text' in data:
text = data['text'].strip()
MAX_CHARS = 300000
return text[:MAX_CHARS]
elif 'error' in data:
raise ValueError(f"Bible API error: {data['error']}")
else:
# Attempt to extract verses if the structure is different
if 'verses' in data and isinstance(data['verses'], list):
text = " ".join([v.get('text', '').strip() for v in data['verses']])
MAX_CHARS = 300000
return text[:MAX_CHARS] if text else ValueError("Bible reference not found or empty.")
else:
raise ValueError("Bible API response format not recognized.")
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching Bible text for '{reference}': {e}")
raise ConnectionError(f"Could not connect to Bible API: {e}")
except Exception as e:
logging.error(f"Error processing Bible reference '{reference}': {e}")
raise ValueError(f"Could not process Bible reference: {e}")
def get_arxiv_content(arxiv_id):
"""Fetch abstract or PDF text from ArXiv."""
try:
# Clean up potential URL prefixes
if 'arxiv.org/abs/' in arxiv_id:
arxiv_id = arxiv_id.split('/abs/')[-1]
if 'arxiv.org/pdf/' in arxiv_id:
arxiv_id = arxiv_id.split('/pdf/')[-1].replace('.pdf', '')
search = arxiv.Search(id_list=[arxiv_id])
paper = next(search.results()) # Get the first (and only) result
# Prioritize abstract, as full PDF processing is heavy
content = f"Title: {paper.title}\n\nAbstract: {paper.summary}"
MAX_CHARS = 300000 # Adjust as needed
return content[:MAX_CHARS], paper.title # Return content and title
except StopIteration:
raise ValueError(f"ArXiv paper with ID '{arxiv_id}' not found.")
except Exception as e:
logging.error(f"Error fetching ArXiv content for {arxiv_id}: {e}")
raise ValueError(f"Could not get ArXiv content: {e}")
# === Gemini Interaction Helpers ===
def generate_notes_with_gemini(text_content, title=None):
"""Generates study notes using Gemini."""
if not gemini_model:
raise ConnectionError("Gemini client not initialized.")
try:
prompt = f"""
Act as an expert educator and study assistant. Based on the following text {'titled "' + title + '" ' if title else ''} , generate comprehensive and well-structured study notes.
**Instructions:**
1. **Identify Key Concepts:** Extract the main topics, definitions, key figures, dates, arguments, and important takeaways.
2. **Structure Logically:** Organize the notes with clear headings (using Markdown ##) and bullet points (* or -) for readability. Use sub-bullets if necessary.
3. **Be Concise but Thorough:** Summarize the information accurately without unnecessary jargon. Ensure all critical points are covered.
4. **Highlight Importance:** You can use bold text (**bold**) for very important terms or concepts.
5. **Focus:** Generate only the notes based on the provided text. Do not add introductions like "Here are the notes..." or conclusions like "These notes cover...".
**Source Text:**
---
{text_content}
---
**Generated Study Notes:**
"""
response = gemini_model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logging.error(f"Gemini note generation failed: {e}")
raise RuntimeError(f"AI failed to generate notes: {e}")
def generate_quiz_with_gemini(notes_content, difficulty, num_questions=5):
"""Generates multiple-choice quiz using Gemini."""
if not gemini_model:
raise ConnectionError("Gemini client not initialized.")
difficulty_map = {
"easy": "basic recall and understanding",
"medium": "application and interpretation",
"hard": "analysis, synthesis, and evaluation"
}
difficulty_desc = difficulty_map.get(difficulty.lower(), "medium difficulty")
try:
prompt = f"""
Act as an expert quiz creator. Based on the following study notes, create a multiple-choice quiz.
**Instructions:**
1. **Number of Questions:** Generate exactly {num_questions} questions.
2. **Difficulty Level:** The questions should be of {difficulty_desc} ({difficulty}).
3. **Format:** Each question must have exactly four options (A, B, C, D).
4. **Clarity:** Questions and options should be clear and unambiguous.
5. **Single Correct Answer:** Ensure only one option is the correct answer.
6. **JSON Output:** Format the entire output STRICTLY as a JSON list of objects. Each object must have the following keys: "question" (string), "options" (an object with keys "A", "B", "C", "D", all strings), and "correct_answer" (string, either "A", "B", "C", or "D").
7. **Focus:** Generate only the JSON output. Do not include any introductory text, explanations, or markdown formatting outside the JSON structure.
**Study Notes:**
---
{notes_content}
---
**Quiz JSON Output:**
```json
[
{{
"question": "...",
"options": {{
"A": "...",
"B": "...",
"C": "...",
"D": "..."
}},
"correct_answer": "..."
}}
// ... more question objects
]
```
"""
response = gemini_model.generate_content(prompt)
# Clean potential markdown code block fences
cleaned_response = response.text.strip().lstrip('```json').rstrip('```').strip()
# Validate and parse JSON
quiz_data = json.loads(cleaned_response)
# Add basic validation (e.g., check if it's a list, check keys in first item)
if not isinstance(quiz_data, list):
raise ValueError("AI response is not a list.")
if quiz_data and not all(k in quiz_data[0] for k in ["question", "options", "correct_answer"]):
raise ValueError("AI response list items have missing keys.")
return quiz_data
except json.JSONDecodeError as e:
logging.error(f"Gemini quiz generation returned invalid JSON: {cleaned_response[:500]}... Error: {e}")
raise RuntimeError(f"AI failed to generate a valid quiz format. Please try again.")
except Exception as e:
logging.error(f"Gemini quiz generation failed: {e}")
raise RuntimeError(f"AI failed to generate quiz: {e}")
# === Authentication Endpoints ===
@app.route('/api/auth/signup', methods=['POST'])
def signup():
if not supabase: return jsonify({'error': 'Service unavailable'}), 503
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password:
return jsonify({'error': 'Email and password are required'}), 400
res = supabase.auth.sign_up({"email": email, "password": password})
# Ensure profile is created with 20 credits
supabase.table('profiles').upsert({
'id': res.user.id,
'email': email,
'credits': 20
}).execute()
return jsonify({
'success': True,
'message': 'Signup successful. Please check your email for verification.',
'user_id': res.user.id if res.user else None
}), 201
except Exception as e:
error_message = str(e)
status_code = 400
if "User already registered" in error_message:
error_message = "Email already exists."
status_code = 409
logging.error(f"Signup error: {error_message}")
return jsonify({'error': error_message}), status_code
@app.route('/api/auth/signin', methods=['POST'])
def signin():
if not supabase: return jsonify({'error': 'Service unavailable'}), 503
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password:
return jsonify({'error': 'Email and password are required'}), 400
# Sign in user using Supabase Auth
res = supabase.auth.sign_in_with_password({"email": email, "password": password})
# Fetch associated profile data
profile_res = supabase.table('profiles').select('*').eq('id', res.user.id).maybe_single().execute()
return jsonify({
'success': True,
'access_token': res.session.access_token,
'refresh_token': res.session.refresh_token,
'user': {
'id': res.user.id,
'email': res.user.email,
'profile': profile_res.data # Include profile details
}
}), 200
except Exception as e:
# Handle specific errors like invalid credentials
error_message = str(e)
status_code = 401 # Unauthorized
if "Invalid login credentials" in error_message:
error_message = "Invalid email or password."
elif "Email not confirmed" in error_message:
error_message = "Please verify your email address before signing in."
status_code = 403 # Forbidden
logging.error(f"Signin error: {error_message}")
return jsonify({'error': error_message}), status_code
@app.route('/api/auth/google-signin', methods=['POST'])
def google_signin():
# Assuming frontend handles OAuth and sends Supabase session token:
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
try:
# User is verified via the token. Fetch their profile.
profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute()
if not profile_res.data:
# This case *shouldn't* happen if the trigger works, but handle defensively
logging.warning(f"Google Sign-In: Profile not found for verified user {user.id}, attempting to create.")
# Attempt to create profile (might fail if email exists from password signup)
insert_res = supabase.table('profiles').insert({
'id': user.id,
'email': user.email,
# Set default credits/roles if needed
}).execute()
profile_data = insert_res.data[0] if insert_res.data else None
if not profile_data:
raise Exception("Failed to create profile entry after Google Sign-In.")
else:
profile_data = profile_res.data
# Return user info (don't need to send tokens back usually, frontend manages session)
return jsonify({
'success': True,
'message': 'Google sign-in verified successfully.',
'user': {
'id': user.id,
'email': user.email,
'profile': profile_data
}
}), 200
except Exception as e:
logging.error(f"Google sign-in profile fetch/creation error: {e}")
return jsonify({'error': f'An error occurred during sign-in: {e}'}), 500
# === User Profile Endpoint ===
@app.route('/api/user/profile', methods=['GET'])
def get_user_profile():
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
try:
# Fetch user's profile data from the 'profiles' table
profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute()
if not profile_res.data:
# This indicates a potential issue (user exists in auth but not profiles)
logging.error(f"Profile not found for authenticated user: {user.id} / {user.email}")
return jsonify({'error': 'User profile not found.'}), 404
# Combine auth info (like email) with profile info
profile_data = profile_res.data
full_user_data = {
'id': user.id,
'email': user.email, # Email from auth is usually the source of truth
'credits': profile_data.get('credits'),
'is_admin': profile_data.get('is_admin'),
'created_at': profile_data.get('created_at'),
'suspended': profile_data.get('suspended')
# Add any other fields from 'profiles' table
}
return jsonify(full_user_data), 200
except Exception as e:
logging.error(f"Error fetching user profile for {user.id}: {e}")
return jsonify({'error': f'Failed to fetch profile: {e}'}), 500
# === AI Tutor Core Endpoints ===
@app.route('/api/tutor/process_input', methods=['POST'])
def process_input_and_generate_notes():
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503
profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute()
if profile_res.data['suspended']:
return jsonify({'error': 'Account suspended'}), 403
if profile_res.data['credits'] < 2:
return jsonify({'error': 'Insufficient credits (Need 2)'}), 402
try:
input_type = request.form.get('input_type')
source_ref = request.form.get('source_ref') # URL, Bible ref, ArXiv ID, etc.
uploaded_file = request.files.get('file') # For PDF
if not input_type:
return jsonify({'error': 'input_type (e.g., pdf, youtube, wiki, bible, arxiv, text) is required'}), 400
content = None
title = None # Optional title
if input_type == 'pdf':
if not uploaded_file: return jsonify({'error': 'File is required for input_type pdf'}), 400
if not uploaded_file.filename.lower().endswith('.pdf'): return jsonify({'error': 'Only PDF files are allowed'}), 400
content = get_pdf_text(uploaded_file.stream)
source_ref = uploaded_file.filename # Use filename as reference
title = uploaded_file.filename
elif input_type == 'youtube':
if not source_ref: return jsonify({'error': 'source_ref (YouTube URL) is required'}), 400
content = get_youtube_transcript(source_ref)
# You could fetch the video title using youtube API/libraries if needed
elif input_type == 'wiki':
if not source_ref: return jsonify({'error': 'source_ref (Wikipedia URL) is required'}), 400
content = get_wiki_content(source_ref)
title = urllib.parse.unquote(source_ref.rstrip("/").split("/")[-1]).replace("_", " ")
elif input_type == 'bible':
if not source_ref: return jsonify({'error': 'source_ref (Bible reference) is required'}), 400
content = fetch_bible_text(source_ref)
title = source_ref
elif input_type == 'arxiv':
if not source_ref: return jsonify({'error': 'source_ref (ArXiv ID or URL) is required'}), 400
content, title = get_arxiv_content(source_ref) # Gets title too
elif input_type == 'text':
content = request.form.get('text_content')
if not content: return jsonify({'error': 'text_content is required for input_type text'}), 400
source_ref = content[:100] + "..." # Use beginning of text as ref
title = "Custom Text"
else:
return jsonify({'error': f'Unsupported input_type: {input_type}'}), 400
if not content:
return jsonify({'error': 'Failed to extract content from the source.'}), 500
# --- Generate Notes ---
start_time = time.time()
logging.info(f"Generating notes for user {user.id}, type: {input_type}, ref: {source_ref[:50]}")
generated_notes = generate_notes_with_gemini(content, title=title)
logging.info(f"Notes generation took {time.time() - start_time:.2f}s")
# --- Save to Database ---
# 1. Save Study Material
material_res = supabase.table('study_materials').insert({
'user_id': user.id,
'type': input_type,
'source_ref': source_ref,
'source_content': content if len(content) < 10000 else content[:10000] + "... (truncated)", # Optionally save truncated content
'title': title
}).execute()
if not material_res.data: raise Exception(f"Failed to save study material: {material_res.error}")
material_id = material_res.data[0]['id']
# 2. Save Notes linked to Material
notes_res = supabase.table('notes').insert({
'material_id': material_id,
'user_id': user.id,
'content': generated_notes
}).execute()
if not notes_res.data: raise Exception(f"Failed to save generated notes: {notes_res.error}")
notes_id = notes_res.data[0]['id']
# --- Deduct Credits (Example) ---
new_credits = profile_res.data['credits'] - 2
supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute()
return jsonify({
'success': True,
'message': 'Content processed and notes generated successfully.',
'material_id': material_id,
'notes_id': notes_id,
'notes': generated_notes # Return notes directly for immediate use
}), 201
except ValueError as e: # Input validation errors
logging.warning(f"Input processing error for user {user.id}: {e}")
return jsonify({'error': str(e)}), 400
except ConnectionError as e: # Service unavailable (Supabase, Gemini, etc.)
logging.error(f"Connection error during processing: {e}")
return jsonify({'error': f'A backend service is unavailable: {e}'}), 503
except RuntimeError as e: # AI generation errors
logging.error(f"RuntimeError during processing for user {user.id}: {e}")
return jsonify({'error': str(e)}), 500
except Exception as e:
logging.error(f"Unexpected error processing input for user {user.id}: {traceback.format_exc()}")
return jsonify({'error': f'An unexpected error occurred: {e}'}), 500
@app.route('/api/view/notes/<uuid:note_id>', methods=['GET'])
def get_note_by_id(note_id):
try:
# --- Authentication ---
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
# --- Query Database ---
note_res = supabase.table('notes') \
.select('id, content, created_at, tts_audio_url, study_materials(title, type, source_ref)') \
.eq('id', note_id) \
.eq('user_id', user.id) \
.maybe_single() \
.execute()
if not note_res.data:
return jsonify({'error': 'Note not found or unauthorized'}), 404
# --- Format Response ---
note_data = note_res.data
response_data = {
"note": {
"note_id": note_data['id'],
"content": note_data['content'],
"audio_url": note_data['tts_audio_url'],
"created_at": note_data['created_at'],
"material": {
"title": note_data['study_materials']['title'] if note_data['study_materials'] else "Untitled",
"type": note_data['study_materials']['type'] if note_data['study_materials'] else None,
"source_ref": note_data['study_materials']['source_ref'] if note_data['study_materials'] else None
}
}
}
return jsonify(response_data)
except Exception as e:
logging.error(f"Error fetching note {note_id}: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/tutor/notes/<uuid:notes_id>/generate_quiz', methods=['POST'])
def generate_quiz_for_notes(notes_id):
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503
profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute()
if profile_res.data['suspended']:
return jsonify({'error': 'Account suspended'}), 403
if profile_res.data['credits'] < 2:
return jsonify({'error': 'Insufficient credits (Need 2)'}), 402
try:
data = request.get_json()
difficulty = data.get('difficulty', 'medium').lower()
num_questions = int(data.get('num_questions', 5))
if difficulty not in ['easy', 'medium', 'hard']:
return jsonify({'error': 'difficulty must be easy, medium, or hard'}), 400
if not 1 <= num_questions <= 10:
return jsonify({'error': 'num_questions must be between 1 and 10'}), 400
# --- Fetch Notes Content ---
notes_res = supabase.table('notes').select('content, user_id').eq('id', notes_id).maybe_single().execute()
if not notes_res.data:
return jsonify({'error': 'Notes not found'}), 404
# Ensure user owns the notes
if notes_res.data['user_id'] != user.id:
return jsonify({'error': 'You do not have permission to access these notes'}), 403
notes_content = notes_res.data['content']
# --- Generate Quiz ---
start_time = time.time()
logging.info(f"Generating {difficulty} quiz ({num_questions}q) for user {user.id}, notes: {notes_id}")
quiz_questions = generate_quiz_with_gemini(notes_content, difficulty, num_questions)
logging.info(f"Quiz generation took {time.time() - start_time:.2f}s")
# --- Save Quiz to Database ---
quiz_res = supabase.table('quizzes').insert({
'notes_id': str(notes_id),
'user_id': user.id,
'difficulty': difficulty,
'questions': json.dumps(quiz_questions) # Store questions as JSONB
}).execute()
if not quiz_res.data: raise Exception(f"Failed to save generated quiz: {quiz_res.error}")
quiz_id = quiz_res.data[0]['id']
new_credits = profile_res.data['credits'] - 2
supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute()
return jsonify({
'success': True,
'quiz_id': quiz_id,
'difficulty': difficulty,
'questions': quiz_questions # Return quiz data for immediate use
}), 201
except ValueError as e:
return jsonify({'error': str(e)}), 400
except ConnectionError as e:
logging.error(f"Connection error during quiz generation: {e}")
return jsonify({'error': f'A backend service is unavailable: {e}'}), 503
except RuntimeError as e: # AI generation errors
logging.error(f"RuntimeError during quiz generation for user {user.id}: {e}")
return jsonify({'error': str(e)}), 500
except Exception as e:
logging.error(f"Unexpected error generating quiz for user {user.id}, notes {notes_id}: {traceback.format_exc()}")
return jsonify({'error': f'An unexpected error occurred: {e}'}), 500
@app.route('/api/view/quizzes/<uuid:quiz_id>', methods=['GET'])
def get_quiz_by_id(quiz_id):
try:
# --- Authentication ---
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
# --- Query Database ---
quiz_res = supabase.table('quizzes') \
.select('''id, difficulty, created_at, questions,
notes(id, content, study_materials(title, type))''') \
.eq('id', quiz_id) \
.eq('user_id', user.id) \
.maybe_single() \
.execute()
if not quiz_res.data:
return jsonify({'error': 'Quiz not found or unauthorized'}), 404
# --- Format Response ---
quiz_data = quiz_res.data
response_data = {
"quiz": {
"quiz_id": quiz_data['id'],
"difficulty": quiz_data['difficulty'],
"created_at": quiz_data['created_at'],
"questions": quiz_data['questions'],
"source_note": {
"note_id": quiz_data['notes']['id'],
"content_preview": quiz_data['notes']['content'][:100] + "..." if quiz_data['notes']['content'] else None,
"material": {
"title": quiz_data['notes']['study_materials']['title'] if quiz_data['notes']['study_materials'] else None,
"type": quiz_data['notes']['study_materials']['type'] if quiz_data['notes']['study_materials'] else None
}
}
}
}
return jsonify(response_data)
except Exception as e:
logging.error(f"Error fetching quiz {quiz_id}: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/tutor/quizzes/<uuid:quiz_id>/submit', methods=['POST'])
def submit_quiz_attempt(quiz_id):
"""Submits user answers for a quiz and calculates the score."""
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
if not supabase: return jsonify({'error': 'Backend service unavailable'}), 503
try:
data = request.get_json()
user_answers = data.get('answers') # Expected format: { "questionId": "A", ... }
if not isinstance(user_answers, dict):
return jsonify({'error': 'answers must be provided as a JSON object'}), 400
# Fetch Quiz Data
quiz_res = supabase.table('quizzes')\
.select('questions, user_id')\
.eq('id', quiz_id)\
.maybe_single()\
.execute()
if not quiz_res.data:
return jsonify({'error': 'Quiz not found'}), 404
quiz_questions = quiz_res.data['questions']
if isinstance(quiz_questions, str):
quiz_questions = json.loads(quiz_questions)
# Calculate Score
correct_count = 0
total_questions = len(quiz_questions)
feedback = {}
correct_answers = {}
for i, question in enumerate(quiz_questions):
question_id = question.get('id', str(i)) # Use question.id if exists, otherwise index
user_answer = user_answers.get(str(question_id)) # Match by question_id
correct_answer = question.get('correct_answer')
if user_answer and correct_answer:
is_correct = user_answer.upper() == correct_answer.upper()
if is_correct:
correct_count += 1
correct_answers[str(question_id)] = correct_answer
feedback[str(question_id)] = {
"correct": is_correct,
"correct_answer": correct_answer,
"user_answer": user_answer
}
score = (correct_count / total_questions) * 100 if total_questions > 0 else 0.0
# Save Quiz Attempt
attempt_res = supabase.table('quiz_attempts').insert({
'quiz_id': str(quiz_id),
'user_id': user.id,
'score': score,
'answers': json.dumps(user_answers)
}).execute()
return jsonify({
'success': True,
'attempt_id': attempt_res.data[0]['id'],
'score': round(score, 2),
'correct_count': correct_count,
'total_questions': total_questions,
'correct_answers': correct_answers, # Send back correct answers
'feedback': feedback
}), 201
except Exception as e:
logging.error(f"Error submitting quiz: {traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
# Modified speak_notes endpoint with ElevenLabs Studio API and chunking
try:
from pydub import AudioSegment
PYDUB_AVAILABLE = True
except ImportError:
PYDUB_AVAILABLE = False
logging.warning("pydub library not found or ffmpeg might be missing. Audio chunk concatenation will fail. Please install pydub and ensure ffmpeg is in your system's PATH.")
# Define a dummy AudioSegment class if pydub is not installed to avoid NameError later
class AudioSegment:
@staticmethod
def from_file(*args, **kwargs):
raise ImportError("pydub/ffmpeg not installed or accessible")
def __add__(self, other):
raise ImportError("pydub/ffmpeg not installed or accessible")
def export(self, *args, **kwargs):
raise ImportError("pydub/ffmpeg not installed or accessible")
def generate_tts_audio(text_to_speak, voice_id="Rachel"):
"""Generates TTS audio using ElevenLabs and returns audio bytes."""
if not elevenlabs_client:
raise ConnectionError("ElevenLabs client not initialized.")
try:
# Stream the audio generation
audio_stream = elevenlabs_client.generate(
text=text_to_speak,
voice=voice_id, # You can customize this
model="eleven_multilingual_v2", # Or another suitable model
stream=False
)
# Collect audio bytes from the stream
audio_bytes = b""
for chunk in audio_stream:
audio_bytes += chunk
if not audio_bytes:
raise ValueError("ElevenLabs generated empty audio.")
return audio_bytes
except Exception as e:
logging.error(f"ElevenLabs TTS generation failed: {e}")
raise RuntimeError(f"Failed to generate audio: {e}")
@app.route('/api/tutor/notes/<uuid:notes_id>/speak', methods=['POST'])
def speak_notes(notes_id):
"""
Generate TTS audio for notes using ElevenLabs,
combine chunks using pydub, and store the final MP3 in Supabase Storage.
Updates the note record with the audio URL and deducts credits.
Rejects requests for content over 10,000 characters.
"""
if not PYDUB_AVAILABLE:
logging.error("Audio processing library (pydub/ffmpeg) check failed.")
return jsonify({'error': 'Server configuration error: Audio processing library not available.'}), 500
# 0. Authenticate User
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
if not supabase or not elevenlabs_client:
logging.error("Backend service (Supabase or ElevenLabs client) not initialized.")
return jsonify({'error': 'Backend service unavailable'}), 503
try:
# 1. Verify note ownership and get content
logging.info(f"Processing speak request for note {notes_id} by user {user.id}")
note_res = supabase.table('notes') \
.select('user_id, content, tts_audio_url') \
.eq('id', str(notes_id)) \
.eq('user_id', user.id) \
.maybe_single() \
.execute()
if not note_res.data:
logging.warning(f"Note {notes_id} not found or unauthorized for user {user.id}.")
return jsonify({'error': 'Note not found or unauthorized'}), 404
# 2. Check user status and credits
profile_res = supabase.table('profiles') \
.select('credits, suspended') \
.eq('id', user.id) \
.single() \
.execute()
# Check for potential errors from profile fetch itself if needed
if not profile_res.data:
logging.error(f"Could not fetch profile for user {user.id}")
return jsonify({'error': 'Failed to retrieve user profile'}), 500
if profile_res.data.get('suspended'):
logging.warning(f"User {user.id} account is suspended.")
return jsonify({'error': 'Account suspended'}), 403
current_credits = profile_res.data.get('credits', 0)
CREDIT_COST = 5
if current_credits < CREDIT_COST:
logging.warning(f"User {user.id} has insufficient credits ({current_credits}/{CREDIT_COST}).")
return jsonify({'error': f'Insufficient credits (Need {CREDIT_COST})'}), 402
# 3. Return existing audio if available (and skip generation/deduction)
existing_audio_url = note_res.data.get('tts_audio_url')
if existing_audio_url:
logging.info(f"Using existing audio URL for note {notes_id}: {existing_audio_url}")
return jsonify({
'success': True,
'audio_url': existing_audio_url,
'message': 'Using existing audio file',
'remaining_credits': current_credits # Return current credits as none were deducted
})
notes_content = note_res.data.get('content')
if not notes_content or not notes_content.strip():
logging.warning(f"Note {notes_id} content is empty.")
return jsonify({'error': 'Notes content is empty'}), 400
# Check for character limit (10,000 characters)
if len(notes_content) > 10000:
logging.warning(f"Note {notes_id} content exceeds 10,000 character limit ({len(notes_content)} chars).")
return jsonify({
'error': 'Content exceeds maximum length',
'message': f'Note content is {len(notes_content)} characters. Maximum allowed is 10,000 characters.'
}), 413
# 4. Generate TTS Audio with chunking (still need chunking for long texts)
# ElevenLabs v2 non-streaming limit is often around 2500 chars, but check docs.
CHUNK_SIZE = 2500
text_chunks = [notes_content[i:i+CHUNK_SIZE] for i in range(0, len(notes_content), CHUNK_SIZE)]
combined_audio_segment = None
logging.info(f"Generating audio for note {notes_id} in {len(text_chunks)} chunks.")
for i, chunk in enumerate(text_chunks):
try:
logging.debug(f"Generating audio for chunk {i+1}/{len(text_chunks)}...")
# Use the new generate_tts_audio function
chunk_audio_bytes = generate_tts_audio(
text_to_speak=chunk.strip(), # Ensure no leading/trailing whitespace in chunk
voice_id="Rachel" # Or your desired voice ID
)
if not chunk_audio_bytes:
logging.warning(f"TTS generation returned empty audio for chunk {i+1} of note {notes_id}")
continue # Skip this chunk, maybe log or handle differently if needed
# Load chunk audio bytes into pydub AudioSegment using BytesIO
segment = AudioSegment.from_file(BytesIO(chunk_audio_bytes), format="mp3")
# Combine segments
if combined_audio_segment is None:
combined_audio_segment = segment
else:
combined_audio_segment += segment # Append segment
logging.debug(f"Successfully processed chunk {i+1}/{len(text_chunks)}")
except ImportError as e:
logging.error(f"pydub/ffmpeg error during chunk processing: {e}")
raise e # Re-raise to be caught by the outer ImportError handler
except Exception as e:
logging.error(f"Error generating/processing audio chunk {i+1} for note {notes_id}: {str(e)}")
# Stop the process if a chunk fails
raise RuntimeError(f"Audio generation/processing failed for chunk {i+1}: {str(e)}")
if combined_audio_segment is None:
# This could happen if all chunks failed or the content was only whitespace
logging.error(f"Failed to generate any audio content for note {notes_id}.")
raise RuntimeError("Failed to generate any audio content.")
# Export combined audio to final bytes
output_bytes_io = BytesIO()
combined_audio_segment.export(output_bytes_io, format="mp3")
final_audio_bytes = output_bytes_io.getvalue() # Get the raw 'bytes' data
if not final_audio_bytes:
logging.error(f"Generated empty final audio file after combining chunks for note {notes_id}.")
raise RuntimeError("Generated empty final audio file after combining chunks.")
logging.info(f"Audio generation complete for note {notes_id}. Total size: {len(final_audio_bytes)} bytes.")
# 5. Save to Supabase Storage
bucket_name = 'notes-audio' # Ensure this bucket exists and has correct policies
# Use user ID and note ID for a unique, organized path
file_path = f'{user.id}/{str(notes_id)}.mp3'
audio_url = None # Initialize audio_url
try:
logging.info(f"Uploading audio to Supabase Storage: {bucket_name}/{file_path}")
# Upload the final combined audio bytes. Use upsert=true to overwrite if regenerating.
supabase.storage.from_(bucket_name).upload(
path=file_path,
file=final_audio_bytes, # Pass the raw 'bytes' object
file_options={"content-type": "audio/mpeg", "upsert": "true"}
)
# Note: supabase-py v1 might raise StorageException on failure.
# v2 might return a response object to check. Adapt error checking if needed.
# Get public URL (make sure RLS policies allow public reads or generate signed URL)
public_url_data = supabase.storage.from_(bucket_name).get_public_url(file_path)
# Assuming the URL is directly in the response data
audio_url = public_url_data
if not audio_url:
# This case indicates an issue with getting the URL after a successful upload
logging.error(f"Upload to {file_path} seemed successful, but failed to get public URL.")
raise ConnectionError("Failed to retrieve audio URL after upload.")
logging.info(f"Audio uploaded successfully for note {notes_id}. URL: {audio_url}")
# --- Database Updates and Credit Deduction ---
# Wrap these in a try/except block for potential rollback on failure
try:
# 6. Update notes table with the audio URL
logging.debug(f"Updating notes table for note {notes_id} with URL.")
update_res = supabase.table('notes') \
.update({'tts_audio_url': audio_url}) \
.eq('id', str(notes_id)) \
.eq('user_id', user.id) \
.execute()
# Basic check if response indicates data was modified (adapt based on client version)
if not update_res.data:
logging.warning(f"Note update query executed for {notes_id} but no data returned (might be ok, or indicate issue).")
# Consider stronger checks based on specific client behavior on error/no-update
# 7. Deduct credits
new_credits = current_credits - CREDIT_COST
logging.debug(f"Deducting {CREDIT_COST} credits for user {user.id}. New balance: {new_credits}")
credit_res = supabase.table('profiles') \
.update({'credits': new_credits}) \
.eq('id', user.id) \
.execute()
# Basic check for credit update
if not credit_res.data:
# CRITICAL: Failed to deduct credits after upload/URL update.
logging.error(f"CRITICAL: Failed to deduct credits for user {user.id} after audio generation for note {notes_id}.")
# Decide handling: Log and proceed? Attempt rollback?
# For now, log error and return success as audio is generated, but flag the inconsistency.
# Ideally, implement transactional logic or robust cleanup.
logging.info(f"Successfully updated database and deducted credits for note {notes_id}")
return jsonify({
'success': True,
'audio_url': audio_url,
'remaining_credits': new_credits
})
except Exception as db_error:
# Error occurred during DB update/credit deduction AFTER successful upload
logging.error(f"Database update/credit deduction failed for note {notes_id} AFTER upload: {str(db_error)}. URL was {audio_url}")
logging.info(f"Attempting to clean up uploaded file: {file_path}")
# Attempt to clean up the uploaded file since DB update failed
try:
supabase.storage.from_(bucket_name).remove([file_path])
logging.info(f"Successfully cleaned up orphaned file: {file_path}")
except Exception as cleanup_error:
logging.error(f"Failed to clean up orphaned file {file_path} after DB error: {cleanup_error}")
# Re-raise the database error to signal the overall operation failed
raise db_error
except Exception as upload_db_error:
# This catches errors during upload OR the subsequent DB operations block if re-raised
logging.error(f"Error during upload or DB update phase for note {notes_id}: {str(upload_db_error)}")
# Attempt cleanup if file might have been uploaded and URL obtained before the error
if audio_url: # Check if upload likely succeeded before the error
try:
logging.info(f"Attempting cleanup for failed operation: {file_path}")
supabase.storage.from_(bucket_name).remove([file_path])
logging.info(f"Cleanup successful for {file_path}")
except Exception as cleanup_error:
# Log if cleanup also fails, but report the original error
logging.error(f"Upload/DB error occurred, AND cleanup failed for {file_path}: {cleanup_error}")
# Re-raise the original error that caused the failure
raise upload_db_error
except ImportError as e:
# Catch the specific ImportError from the pydub check/usage
logging.error(f"Missing dependency error: {e}")
return jsonify({'error': 'Server configuration error: Audio library (pydub/ffmpeg) missing or failed.'}), 500
except (RuntimeError, ConnectionError) as e:
# Catch specific errors we raised for generation/upload/db issues
logging.error(f"Operation failed for note {notes_id}: {str(e)}")
return jsonify({'error': str(e)}), 500 # Return the specific error message
except Exception as e:
# Catch any other unexpected errors
logging.error(f"Unexpected speak endpoint error for note {notes_id}: {traceback.format_exc()}")
# Return a generic error message to the client for unknown errors
return jsonify({'error': 'An unexpected error occurred during audio generation.'}), 500
# New endpoint to view existing audio URL
@app.route('/api/tutor/notes/<uuid:notes_id>/audio', methods=['GET'])
def get_note_audio(notes_id):
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
try:
notes_res = supabase.table('notes').select('tts_audio_url, user_id').eq('id', notes_id).single().execute()
if not notes_res.data:
return jsonify({'error': 'Notes not found'}), 404
if notes_res.data['user_id'] != user.id:
return jsonify({'error': 'Unauthorized access'}), 403
if not notes_res.data['tts_audio_url']:
return jsonify({'error': 'No audio available for these notes'}), 404
return jsonify({
'success': True,
'audio_url': notes_res.data['tts_audio_url']
})
except Exception as e:
logging.error(f"Error getting audio URL: {str(e)}")
return jsonify({'error': str(e)}), 500
# ---------- View Notes and Quizzes Endpoints ----------
@app.route('/api/view/notes', methods=['GET'])
def view_notes():
try:
# Authentication
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
# Query with proper error handling
query = supabase.table('notes') \
.select('id, content, created_at, tts_audio_url, study_materials(title, type)') \
.eq('user_id', user.id) \
.order('created_at', desc=True)
result = query.execute()
if hasattr(result, 'error') and result.error:
raise Exception(result.error.message)
# Format response to match frontend expectations
notes = []
for note in result.data:
notes.append({
"note_id": note['id'],
"content": note['content'],
"audio_url": note['tts_audio_url'],
"created_at": note['created_at'],
"material_title": note['study_materials']['title'] if note['study_materials'] else "Untitled Note",
"material_type": note['study_materials']['type'] if note['study_materials'] else None
})
return jsonify({"notes": notes}) # Changed from "notes" to match frontend
except Exception as e:
print(f"Error in /api/view/notes: {str(e)}") # Debug logging
logging.error(f"Notes endpoint error: {str(e)}")
logging.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/api/view/quizzes', methods=['GET'])
def view_quizzes():
try:
# Authentication
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
# Query with proper error handling
query = supabase.table('quizzes') \
.select('id, difficulty, created_at, notes(content, study_materials(title, type))') \
.eq('user_id', user.id) \
.order('created_at', desc=True)
result = query.execute()
if hasattr(result, 'error') and result.error:
raise Exception(result.error.message)
# Format response to match frontend expectations
quizzes = []
for quiz in result.data:
quizzes.append({
"quiz_id": quiz['id'],
"difficulty": quiz['difficulty'],
"created_at": quiz['created_at'],
"notes_preview": quiz['notes']['content'][:100] + "..." if quiz['notes'] and quiz['notes']['content'] else None,
"material_title": quiz['notes']['study_materials']['title'] if quiz['notes'] and quiz['notes']['study_materials'] else "Untitled Quiz",
"material_type": quiz['notes']['study_materials']['type'] if quiz['notes'] and quiz['notes']['study_materials'] else None
})
return jsonify({"quizzes": quizzes})
except Exception as e:
print(f"Error in /api/view/quizzes: {str(e)}") # Debug logging
logging.error(f"Quizzes endpoint error: {str(e)}")
logging.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/api/user/performance', methods=['GET'])
def get_user_performance():
"""Retrieves user's quiz performance and provides simple suggestions."""
try:
# --- Authentication ---
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
# --- Query Attempts with Proper Error Handling ---
attempts_res = supabase.table('quiz_attempts') \
.select('id, quiz_id, score, submitted_at, quizzes(id, difficulty, created_at, notes(study_materials(title)))') \
.eq('user_id', user.id) \
.order('submitted_at', desc=True) \
.execute()
if hasattr(attempts_res, 'error') and attempts_res.error:
raise Exception(attempts_res.error.message)
attempts_data = attempts_res.data
# --- Group Attempts by Quiz ---
quizzes = {}
for attempt in attempts_data:
quiz_id = attempt['quizzes']['id']
if quiz_id not in quizzes:
quizzes[quiz_id] = {
'quiz_info': {
'id': quiz_id,
'title': attempt['quizzes']['notes']['study_materials']['title'],
'difficulty': attempt['quizzes']['difficulty'],
'created_at': attempt['quizzes']['created_at']
},
'attempts': []
}
quizzes[quiz_id]['attempts'].append(attempt)
# --- Calculate Averages ---
performance_data = []
overall_scores = []
for quiz_id, quiz_data in quizzes.items():
scores = [a['score'] for a in quiz_data['attempts']]
avg_score = sum(scores) / len(scores) if scores else 0
overall_scores.extend(scores)
performance_data.append({
**quiz_data,
'average_score': avg_score,
'attempt_count': len(scores)
})
# --- Calculate Overall Average ---
average_score = sum(overall_scores) / len(overall_scores) if overall_scores else 0
# --- Generate Suggestions ---
suggestions = []
if performance_data:
if average_score < 60:
suggestions.append("Your average score is a bit low. Try reviewing the notes more thoroughly before taking quizzes.")
# Find lowest scoring quiz
weakest_quiz = min(performance_data, key=lambda x: x['average_score'])
suggestions.append(f"Focus on: '{weakest_quiz['quiz_info']['title']}' (current average: {weakest_quiz['average_score']:.0f}%)")
elif average_score > 85:
suggestions.append("Great job! Try some 'hard' difficulty quizzes.")
else:
suggestions.append("You're making good progress! Keep practicing.")
return jsonify({
'success': True,
'average_score': round(average_score, 2),
'quizzes': performance_data, # Changed from recent_attempts to quizzes
'suggestions': suggestions
})
except Exception as e:
logging.error(f"Performance endpoint error: {str(e)}")
logging.error(traceback.format_exc())
return jsonify({'error': 'Internal server error'}), 500
def generate_suggestions(quizzes, overall_avg):
"""Generate personalized suggestions based on quiz performance"""
suggestions = []
if overall_avg < 60:
suggestions.append("Your average score is a bit low. Try reviewing notes before retaking quizzes.")
elif overall_avg > 85:
suggestions.append("Great job! Challenge yourself with harder difficulty levels.")
else:
suggestions.append("Keep practicing! Focus on your weaker areas for improvement.")
# Find weakest quiz
weakest = min(quizzes, key=lambda x: x['average_score'], default=None)
if weakest and weakest['average_score'] < 60:
title = weakest['quiz_info']['title'] or "your recent quizzes"
suggestions.append(f"Focus on improving in '{title}' (current average: {weakest['average_score']:.0f}%).")
# Check for difficulty distribution
difficulty_count = {}
for quiz in quizzes:
diff = quiz['quiz_info']['difficulty']
difficulty_count[diff] = difficulty_count.get(diff, 0) + 1
if difficulty_count.get('easy', 0) / len(quizzes) > 0.7:
suggestions.append("Try more medium difficulty quizzes to push your skills!")
return suggestions
# === Admin Endpoints (Adapted for Supabase) ===
@app.route('/api/admin/users', methods=['GET'])
def admin_list_users():
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
is_admin, admin_error = verify_admin(user)
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status']
try:
# Fetch all profiles (which are linked 1-1 with auth users)
profiles_res = supabase.table('profiles').select('*').execute()
return jsonify({'users': profiles_res.data}), 200
except Exception as e:
logging.error(f"Admin list users error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/users/<uuid:target_user_id>/suspend', methods=['PUT'])
def admin_suspend_user(target_user_id):
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
is_admin, admin_error = verify_admin(user)
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status']
try:
data = request.get_json()
action = data.get('action') # "suspend" or "unsuspend"
if action not in ["suspend", "unsuspend"]:
return jsonify({'error': 'action must be "suspend" or "unsuspend"'}), 400
should_suspend = (action == "suspend")
# Update the 'suspended' flag in the profiles table
update_res = supabase.table('profiles').update({'suspended': should_suspend}).eq('id', target_user_id).execute()
if not update_res.data:
# This could mean the user ID doesn't exist or there was another issue
# Check if user exists first
user_check = supabase.table('profiles').select('id').eq('id', target_user_id).maybe_single().execute()
if not user_check.data:
return jsonify({'error': 'User not found'}), 404
else:
raise Exception(f"Failed to update suspension status: {update_res.error}")
return jsonify({'success': True, 'message': f'User {target_user_id} suspension status set to {should_suspend}'}), 200
except Exception as e:
logging.error(f"Admin suspend user error: {e}")
return jsonify({'error': str(e)}), 500
# Add other admin endpoints (update credits, view specific data) similarly,
# === Credit Management Endpoints ===
@app.route('/api/user/credits/request', methods=['POST'])
def request_credits():
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
try:
data = request.get_json()
amount = data.get('amount')
note = data.get('note', '')
if not amount or not isinstance(amount, int) or amount <= 0:
return jsonify({'error': 'Invalid amount (must be positive integer)'}), 400
res = supabase.table('credit_requests').insert({
'user_id': user.id,
'amount': amount,
'status': 'pending',
'note': note,
'created_at': datetime.now().isoformat()
}).execute()
return jsonify({
'success': True,
'request_id': res.data[0]['id']
}), 201
except Exception as e:
logging.error(f"Credit request failed for user {user.id}: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/credit-requests', methods=['GET'])
def admin_get_credit_requests():
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
is_admin, admin_error = verify_admin(user)
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status']
try:
status = request.args.get('status', 'pending')
res = supabase.table('credit_requests').select('*').eq('status', status).execute()
return jsonify(res.data), 200
except Exception as e:
logging.error(f"Admin credit requests fetch failed: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/credit-requests/<uuid:request_id>', methods=['PUT'])
def admin_review_credit_request(request_id):
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
is_admin, admin_error = verify_admin(user)
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status']
try:
data = request.get_json()
action = data.get('action')
admin_note = data.get('note', '')
if action not in ['approve', 'decline']:
return jsonify({'error': 'Invalid action'}), 400
req_res = supabase.table('credit_requests').select('*').eq('id', request_id).maybe_single().execute()
if not req_res.data:
return jsonify({'error': 'Request not found'}), 404
req = req_res.data
if req['status'] != 'pending':
return jsonify({'error': 'Request already processed'}), 400
update_data = {
'status': 'approved' if action == 'approve' else 'declined',
'reviewed_at': datetime.now().isoformat(),
'reviewed_by': user.id,
'admin_note': admin_note
}
if action == 'approve':
supabase.table('profiles').update(
{'credits': supabase.table('profiles').credits + req['amount']}
).eq('id', req['user_id']).execute()
supabase.table('credit_requests').update(update_data).eq('id', request_id).execute()
return jsonify({'success': True}), 200
except Exception as e:
logging.error(f"Credit request processing failed: {e}")
return jsonify({'error': str(e)}), 500
# === Main Execution ===
if __name__ == '__main__':
if not all([SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY]):
print("WARNING: One or more essential environment variables (SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY) are missing!")
print("Starting Flask server for AI Tutor...")
# Use Gunicorn or Waitress for production instead of app.run(debug=True)
app.run(debug=True, host="0.0.0.0", port=7860)