ProfAI-API / main.py
rairo's picture
Update main.py
1001868 verified
# main.py
import os
import uuid
import json
import traceback
import threading
import time
from datetime import datetime, timezone
from flask import Flask, request, jsonify
from flask_cors import CORS
import firebase_admin
from firebase_admin import credentials, db, storage, auth
from flask_apscheduler import APScheduler
import requests
from lesson_gen import (
fetch_arxiv_papers, generate_knowledge_base,
generate_lesson_from_knowledge_base, generate_remedial_lesson,
generate_profai_video_from_script
)
import logging
# --- 1. CONFIGURATION & INITIALIZATION ---
app = Flask(__name__)
CORS(app)
# Initialize Scheduler
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
try:
credentials_json_string = os.environ.get("FIREBASE")
if not credentials_json_string: raise ValueError("FIREBASE env var not set.")
credentials_json = json.loads(credentials_json_string)
firebase_db_url = os.environ.get("Firebase_DB")
firebase_storage_bucket = os.environ.get("Firebase_Storage")
if not all([firebase_db_url, firebase_storage_bucket]): raise ValueError("Firebase DB/Storage env vars must be set.")
cred = credentials.Certificate(credentials_json)
firebase_admin.initialize_app(cred, {'databaseURL': firebase_db_url, 'storageBucket': firebase_storage_bucket})
logging.info("Firebase Admin SDK initialized successfully for ProfAI.")
except Exception as e:
logging.fatal(f"FATAL: Error initializing Firebase: {e}")
bucket = storage.bucket()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
RESEND_API_KEY = os.environ.get("RESEND_API_KEY")
# --- 2. HELPER FUNCTIONS & WORKERS ---
def verify_token(token):
try: return auth.verify_id_token(token)['uid']
except Exception as e:
logging.error(f"Token verification failed: {e}")
return None
# --- Resend Email Notification System (Ported from Sozo) ---
class ResendRateLimiter:
def __init__(self, requests_per_second=1):
self.requests_per_second = requests_per_second
self.min_interval = 1.0 / requests_per_second
self.last_request_time = 0
self.lock = threading.Lock()
def wait_if_needed(self):
with self.lock:
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.min_interval:
sleep_time = self.min_interval - time_since_last_request
logger.info(f"Rate limiting: waiting {sleep_time:.2f} seconds before sending email")
time.sleep(sleep_time)
self.last_request_time = time.time()
resend_rate_limiter = ResendRateLimiter(requests_per_second=1)
def _send_notification(user_email, email_subject, email_body):
if not RESEND_API_KEY:
logger.error("RESEND_API_KEY is not configured. Cannot send email.")
return False
resend_rate_limiter.wait_if_needed()
headers = {"Authorization": f"Bearer {RESEND_API_KEY.strip()}", "Content-Type": "application/json"}
payload = {"from": "ProfAI <lessons@sozofix.tech>", "to": [user_email], "subject": email_subject, "html": email_body}
try:
response = requests.post("https://api.resend.com/emails", headers=headers, json=payload)
response.raise_for_status()
logger.info(f"Successfully sent email to {user_email}. Subject: {email_subject}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send email to {user_email} via Resend: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Error response status: {e.response.status_code}, content: {e.response.text}")
return False
def send_lesson_ready_email(user_id, course_id, course_topic, lesson_concept):
user_data = db.reference(f'users/{user_id}').get()
if not user_data or not user_data.get('email'):
logger.warning(f"No email found for user {user_id}, cannot send notification.")
return
email = user_data['email']
subject = f"✅ Your New ProfAI Lesson is Ready: {lesson_concept}"
body = f"""
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: auto; border: 1px solid #ddd; border-radius: 10px; padding: 20px;">
<h1 style="color: #4A90E2; text-align: center;">ProfAI Lesson Ready!</h1>
<p>Hi there,</p>
<p>Great news! Your next video lesson for the course "<strong>{course_topic}</strong>" is now available.</p>
<p><strong>New Lesson:</strong> {lesson_concept}</p>
<div style="text-align: center; margin: 30px 0;">
<a href="https://profai.sozofix.tech/course/{course_id}"
style="background-color: #4A90E2; color: white; padding: 15px 30px; text-decoration: none; border-radius: 8px; font-weight: bold;">
Start Learning Now
</a>
</div>
<p>Keep up the great work!</p>
<p><em>— The ProfAI Team</em></p>
</div>
"""
_send_notification(email, subject, body)
# --- WORKER FUNCTIONS (Called by APScheduler) ---
def generation_worker(course_id, user_id, topic, level, goal):
logger.info(f"WORKER [{course_id}]: Starting for topic '{topic}'.")
course_ref = db.reference(f'profai_courses/{course_id}')
try:
arxiv_docs = fetch_arxiv_papers(topic)
knowledge_base = generate_knowledge_base(topic, level, goal, arxiv_docs)
learning_path = knowledge_base.get("learning_path", [])
course_ref.update({"knowledgeBase": knowledge_base, "learningPath": learning_path, "status": "generating_video"})
first_concept = learning_path[0] if learning_path else "Introduction"
lesson_content = generate_lesson_from_knowledge_base(knowledge_base, first_concept)
video_bytes = generate_profai_video_from_script(lesson_content['script'], topic)
if not video_bytes: raise Exception("Video generation failed, cannot proceed.")
blob_name = f"profai_courses/{user_id}/{course_id}/lesson_1.mp4"
blob = bucket.blob(blob_name)
blob.upload_from_string(video_bytes, content_type="video/mp4")
steps = [{
"step": 1, "type": "video", "concept": first_concept,
"videoUrl": blob.public_url, "status": "unlocked"
}, {
"step": 2, "type": "quiz", "concept": first_concept,
"quizData": lesson_content['quiz'], "status": "locked"
}]
course_ref.update({"status": "ready", "steps": steps})
user_ref = db.reference(f'users/{user_id}')
user_credits = user_ref.child('credits').get()
user_ref.update({'credits': user_credits - 8})
send_lesson_ready_email(user_id, course_id, topic, first_concept)
logger.info(f"WORKER [{course_id}]: Success. Charged 8 credits.")
except Exception as e:
logger.error(f"WORKER [{course_id}]: Failed: {traceback.format_exc()}")
course_ref.update({"status": "failed", "error": str(e)})
def lesson_worker(course_id, user_id, concept, current_steps, is_remedial=False, regenerate_step_num=None):
action = "Regenerating" if regenerate_step_num else "Generating"
logger.info(f"WORKER [{course_id}]: {action} lesson for concept '{concept}'. Remedial: {is_remedial}")
course_ref = db.reference(f'profai_courses/{course_id}')
try:
course_data = course_ref.get()
knowledge_base = course_data.get('knowledgeBase')
topic = course_data.get('topic', 'your course')
if not knowledge_base: raise Exception("Knowledge base not found for course.")
if is_remedial:
lesson_content = generate_remedial_lesson(concept)
cost = 5
else:
lesson_content = generate_lesson_from_knowledge_base(knowledge_base, concept)
cost = 8
video_bytes = generate_profai_video_from_script(lesson_content['script'], topic)
if not video_bytes: raise Exception("Video generation failed.")
step_num = regenerate_step_num if regenerate_step_num else len(current_steps) + 1
blob_name = f"profai_courses/{user_id}/{course_id}/lesson_{step_num}_{uuid.uuid4().hex[:4]}.mp4"
blob = bucket.blob(blob_name)
blob.upload_from_string(video_bytes, content_type="video/mp4")
new_video_step = {"step": step_num, "type": "video", "concept": concept, "videoUrl": blob.public_url, "status": "unlocked"}
new_quiz_step = {"step": step_num + 1, "type": "quiz", "concept": concept, "quizData": lesson_content['quiz'], "status": "locked"}
if regenerate_step_num:
current_steps[regenerate_step_num - 1] = new_video_step
if (regenerate_step_num < len(current_steps)) and current_steps[regenerate_step_num]['type'] == 'quiz':
current_steps[regenerate_step_num] = new_quiz_step
else:
current_steps.extend([new_video_step, new_quiz_step])
course_ref.child('steps').set(current_steps)
user_ref = db.reference(f'users/{user_id}')
user_credits = user_ref.child('credits').get()
user_ref.update({'credits': user_credits - cost})
send_lesson_ready_email(user_id, course_id, topic, concept)
logger.info(f"WORKER [{course_id}]: Lesson '{concept}' processed. Charged {cost} credits.")
except Exception as e:
logger.error(f"WORKER [{course_id}]: Lesson generation for '{concept}' failed: {traceback.format_exc()}")
current_steps[-1]['status'] = 'generation_failed'
course_ref.child('steps').set(current_steps)
# --- 3. USER AUTHENTICATION & MANAGEMENT ---
@app.route('/api/auth/google-signin', methods=['POST'])
def google_signin():
try:
token = request.headers.get('Authorization', '').split(' ')[1]
decoded_token = auth.verify_id_token(token)
uid, email = decoded_token['uid'], decoded_token.get('email')
user_ref = db.reference(f'users/{uid}')
if not user_ref.get():
logger.info(f"New user signed in with Google: {email}, UID: {uid}. Creating profile.")
user_data = {'email': email, 'credits': 20, 'created_at': datetime.utcnow().isoformat()}
user_ref.set(user_data)
else:
logger.info(f"Existing user signed in with Google: {email}, UID: {uid}.")
final_user_data = user_ref.get()
return jsonify({'success': True, 'user': {'uid': uid, **final_user_data}}), 200
except Exception as e:
logger.error(f"Google Sign-in failed: {traceback.format_exc()}")
return jsonify({'error': str(e)}), 400
# --- 4. ProfAI API ENDPOINTS ---
# In main.py, replace the old get_user_courses function with this one.
@app.route('/api/profai/courses', methods=['GET'])
def get_user_courses():
try:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': 'Missing or invalid Authorization header'}), 401
token = auth_header.split(' ')[1]
uid = verify_token(token)
if not uid:
return jsonify({'error': 'Unauthorized or invalid token'}), 401
courses_ref = db.reference('profai_courses')
# This query is safe even if the path or user courses don't exist.
user_courses = courses_ref.order_by_child('uid').equal_to(uid).get()
# This correctly handles cases where the user has no courses.
if not user_courses:
return jsonify([]), 200
# Convert dict to a list sorted by creation date
courses_list = sorted(user_courses.values(), key=lambda x: x.get('createdAt', ''), reverse=True)
return jsonify(courses_list), 200
except Exception as e:
# This will now only catch truly unexpected server errors
logger.error(f"Error in get_user_courses: {traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
@app.route('/api/profai/start-course', methods=['POST'])
def start_course():
try:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
if not uid: return jsonify({'error': 'Unauthorized'}), 401
user_credits = db.reference(f'users/{uid}/credits').get()
if not isinstance(user_credits, (int, float)) or user_credits < 8:
return jsonify({'error': 'Insufficient credits. Requires 8 credits to start a new course.'}), 402
data = request.get_json()
topic, level, goal = data.get('topic'), data.get('level'), data.get('goal')
if not all([topic, level, goal]): return jsonify({'error': 'topic, level, and goal are required'}), 400
course_id = uuid.uuid4().hex
course_data = {'id': course_id, 'uid': uid, 'topic': topic, 'status': 'generating', 'createdAt': datetime.utcnow().isoformat()}
db.reference(f'profai_courses/{course_id}').set(course_data)
scheduler.add_job(id=f'start_{course_id}', func=generation_worker, args=[course_id, uid, topic, level, goal])
return jsonify({'success': True, 'message': 'Your personalized course is being generated!', 'courseId': course_id}), 202
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/profai/course-status/<string:course_id>', methods=['GET'])
def get_course_status(course_id):
try:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
if not uid: return jsonify({'error': 'Unauthorized'}), 401
course_data = db.reference(f'profai_courses/{course_id}').get()
if not course_data or course_data.get('uid') != uid:
return jsonify({'error': 'Course not found or unauthorized'}), 404
return jsonify(course_data), 200
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/profai/courses/<string:course_id>', methods=['DELETE'])
def delete_course(course_id):
try:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
if not uid: return jsonify({'error': 'Unauthorized'}), 401
course_ref = db.reference(f'profai_courses/{course_id}')
course_data = course_ref.get()
if not course_data or course_data.get('uid') != uid:
return jsonify({'error': 'Course not found or unauthorized'}), 404
storage_prefix = f"profai_courses/{uid}/{course_id}/"
blobs_to_delete = bucket.list_blobs(prefix=storage_prefix)
for blob in blobs_to_delete:
blob.delete()
course_ref.delete()
return jsonify({'success': True, 'message': 'Course deleted successfully.'}), 200
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/profai/courses/<string:course_id>/steps/<int:step_num>/regenerate', methods=['POST'])
def regenerate_step(course_id, step_num):
try:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
if not uid: return jsonify({'error': 'Unauthorized'}), 401
user_credits = db.reference(f'users/{uid}/credits').get()
if user_credits < 8: return jsonify({'error': 'Insufficient credits. Regeneration requires 8 credits.'}), 402
course_ref = db.reference(f'profai_courses/{course_id}')
course_data = course_ref.get()
if not course_data or course_data.get('uid') != uid: return jsonify({'error': 'Course not found or unauthorized'}), 404
steps = course_data.get('steps', [])
if not (0 < step_num <= len(steps) and steps[step_num - 1]['type'] == 'video'):
return jsonify({'error': 'Invalid step number. You can only regenerate video steps.'}), 400
concept_to_regenerate = steps[step_num - 1]['concept']
steps[step_num - 1]['status'] = 'regenerating'
course_ref.child('steps').set(steps)
scheduler.add_job(id=f'regen_{course_id}_{step_num}', func=lesson_worker, args=[course_id, uid, concept_to_regenerate, steps, False, step_num])
return jsonify({'success': True, 'message': f"Regenerating lesson for '{concept_to_regenerate}'."}), 202
except Exception as e: return jsonify({'error': str(e)}), 500
@app.route('/api/profai/submit-quiz/<string:course_id>', methods=['POST'])
def submit_quiz(course_id):
try:
uid = verify_token(request.headers.get('Authorization', '').split(' ')[1])
if not uid: return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json()
answers = data.get('answers', {})
course_ref = db.reference(f'profai_courses/{course_id}')
course_data = course_ref.get()
if not course_data or course_data.get('uid') != uid: return jsonify({'error': 'Course not found'}), 404
steps = course_data.get('steps', [])
learning_path = course_data.get('learningPath', [])
quiz_step_index = next((i for i, step in enumerate(steps) if step.get('status') == 'unlocked' and step.get('type') == 'quiz'), -1)
if quiz_step_index == -1: return jsonify({'error': 'No active quiz found'}), 400
quiz_data = steps[quiz_step_index]['quizData']
correct_count = sum(1 for q in quiz_data if answers.get(q['question']) == q['answer'])
score = correct_count / len(quiz_data)
steps[quiz_step_index]['status'] = 'completed'
next_action_message = ""
if score > 0.6: # Pass
user_credits = db.reference(f'users/{uid}/credits').get()
if user_credits < 8:
steps.append({"step": len(steps) + 1, "type": "insufficient_credits", "status": "unlocked", "message": "You passed, but need more credits for the next lesson."})
next_action_message = "Passed! But you need more credits to continue."
else:
last_concept = steps[quiz_step_index]['concept']
try:
next_concept_index = learning_path.index(last_concept) + 1
if next_concept_index < len(learning_path):
next_concept = learning_path[next_concept_index]
steps[quiz_step_index + 1]['status'] = 'generating' # For UI
scheduler.add_job(id=f'lesson_{course_id}_{next_concept_index}', func=lesson_worker, args=[course_id, uid, next_concept, steps, False])
next_action_message = f"Passed! Generating your next lesson on '{next_concept}'."
else:
steps.append({"step": len(steps) + 1, "type": "course_complete", "status": "unlocked"})
next_action_message = "Congratulations! You've completed the course."
except (ValueError, IndexError):
return jsonify({'error': 'Could not determine the next lesson step.'}), 500
else: # Struggle
user_credits = db.reference(f'users/{uid}/credits').get()
if user_credits < 5:
steps.append({"step": len(steps) + 1, "type": "insufficient_credits", "status": "unlocked", "message": "Let's review, but you need more credits for a remedial lesson."})
next_action_message = "Let's review! But you need more credits to generate a remedial lesson."
else:
failed_concept = steps[quiz_step_index]['concept']
steps.append({"step": len(steps) + 1, "type": "video", "status": "generating"}) # For UI
scheduler.add_job(id=f'remedial_{course_id}', func=lesson_worker, args=[course_id, uid, failed_concept, steps, True])
next_action_message = f"Let's review. Generating a remedial lesson on '{failed_concept}'."
course_ref.child('steps').set(steps)
return jsonify({'success': True, 'score': score, 'message': next_action_message}), 202
except Exception as e:
logger.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
# --- 5. MAIN EXECUTION ---
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860))
app.run(debug=True, host="0.0.0.0", port=port)