app2 / code.txt
Dooratre's picture
Upload 318 files
c37398c verified
================================================================================
PYTHON CODE COLLECTION
Project Folder: main_project
Total .py files found: 38
================================================================================
================================================================================
FILE 1: app.py
FULL PATH: main_project\app.py
================================================================================
"""
โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
CORVO AI - Complete Teaching Platform
Main Application Entry Point (High-Performance Edition)
Run with gunicorn: gunicorn -c gunicorn_config.py app:app
Or dev mode: python app.py
Open: http://localhost:5000
โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
"""
from gevent import monkey
monkey.patch_all()
import os
import atexit
from flask import Flask, redirect, url_for, session, flash, render_template, send_from_directory, jsonify, request
from flask_socketio import SocketIO
from config import SECRET_KEY, DEBUG, HOST, PORT, REQUIRED_DIRS
# Check if running in Docker
IN_DOCKER = os.path.exists('/.dockerenv') or os.environ.get('DOCKER', 'false') == 'true'
if IN_DOCKER:
print(" ๐Ÿณ Running inside Docker container")
# Check if data volume is mounted
for f in ['users.json', 'users_db.json', 'cards.json', 'chat_history_db.json']:
if os.path.exists(f):
size = os.path.getsize(f)
print(f" โœ… {f} found ({size} bytes)")
else:
print(f" โš ๏ธ {f} not found - creating empty")
with open(f, 'w') as fh:
fh.write('{}')
# โ”€โ”€โ”€ Create Flask app โ”€โ”€โ”€
app = Flask(__name__)
app.secret_key = SECRET_KEY
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
# โ”€โ”€โ”€ Create required directories (audio only, no JSON) โ”€โ”€โ”€
for d in REQUIRED_DIRS:
os.makedirs(d, exist_ok=True)
# โ”€โ”€โ”€ Initialize In-Memory Database (loads from GitHub) โ”€โ”€โ”€
from memory_db import get_db
db = get_db()
# โ”€โ”€โ”€ Print DB status (no file init needed) โ”€โ”€โ”€
from database.users import init_users_db
from database.cards import init_cards_db
from database.chat_history import init_chat_history_db
init_users_db()
init_cards_db()
init_chat_history_db()
# โ”€โ”€โ”€ Initialize AI engines โ”€โ”€โ”€
from chat.agent import AIAgent
from board.engine import BoardEngine
agent = AIAgent()
board_engine = BoardEngine()
# โ”€โ”€โ”€ Register blueprints โ”€โ”€โ”€
# Auth
from auth.routes import auth_bp, init_auth_socketio
init_auth_socketio(socketio)
app.register_blueprint(auth_bp)
# Chat
from chat.routes import chat_bp, init_chat_agent
init_chat_agent(agent)
app.register_blueprint(chat_bp)
# Board
from board.routes import board_bp, init_board_engine
init_board_engine(board_engine)
app.register_blueprint(board_bp)
# Exam
from exam.routes import exam_bp
app.register_blueprint(exam_bp)
# Media
from media.routes import media_bp, init_media_agent
init_media_agent(agent)
app.register_blueprint(media_bp)
# Market
from market.routes import market_bp
app.register_blueprint(market_bp)
# WebSocket
from websocket.events import register_socketio_events
register_socketio_events(socketio)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# ROOT ROUTES
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
from auth.helpers import is_session_valid
from database.users import load_users_db, get_user
from subjects.definitions import SUBJECTS, is_board_enabled
@app.route('/')
def index():
if is_session_valid():
return redirect(url_for('dashboard'))
return redirect(url_for('auth.signup'))
@app.route('/dashboard')
def dashboard():
if not is_session_valid():
session.clear()
flash('ุงู†ุชู‡ุช ุงู„ุฌู„ุณุฉุŒ ูŠุฑุฌู‰ ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰.', 'error')
return redirect(url_for('auth.login'))
username = session['username']
user_data = get_user(username)
if not user_data:
session.clear()
flash('ุงู„ู…ุณุชุฎุฏู… ุบูŠุฑ ู…ูˆุฌูˆุฏ.', 'error')
return redirect(url_for('auth.login'))
student_type = user_data.get('student_type', 'ุนู„ู…ูŠ')
all_subjects = SUBJECTS.get(student_type, [])
purchased_subjects = user_data.get('purchased_subjects', ['islamic'])
my_subjects = []
for s in all_subjects:
if s['id'] in purchased_subjects:
subject_info = dict(s)
subject_info['board_enabled'] = is_board_enabled(s['id'])
my_subjects.append(subject_info)
available_subjects = [s for s in all_subjects if s['id'] not in purchased_subjects]
return render_template('dashboard.html',
username=username,
user_data=user_data,
my_subjects=my_subjects,
available_subjects=available_subjects,
balance=user_data.get('balance', 0))
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory('static', filename)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# DATABASE BACKUP & ADMIN ROUTES (GitHub-only)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
ADMIN_SECRET = os.environ.get("ADMIN_SECRET", "corvo_admin_2024")
def _check_admin_auth():
secret = request.headers.get('X-Admin-Secret') or request.args.get('secret', '')
return secret == ADMIN_SECRET
@app.route('/backup_db', methods=['GET', 'POST'])
def backup_db():
"""Push all in-memory data to GitHub."""
if not _check_admin_auth():
return jsonify({"error": "Unauthorized", "hint": "Provide ?secret= or X-Admin-Secret header"}), 401
success, errors = db.push_to_github()
if success:
return jsonify({
"success": True,
"message": "All databases pushed to GitHub successfully",
"stats": db.get_stats()
})
else:
return jsonify({
"success": False,
"message": "Some files failed to push",
"errors": errors,
"stats": db.get_stats()
}), 500
@app.route('/backup_db_single', methods=['GET', 'POST'])
def backup_db_single():
"""Push a single database to GitHub."""
if not _check_admin_auth():
return jsonify({"error": "Unauthorized"}), 401
store_name = request.args.get('store', '') or (request.get_json() or {}).get('store', '')
if not store_name:
return jsonify({
"error": "Specify store name",
"available": list(db.STORE_FILES.keys())
}), 400
if store_name not in db.STORE_FILES:
return jsonify({
"error": f"Unknown store: {store_name}",
"available": list(db.STORE_FILES.keys())
}), 400
success, error = db.push_single_to_github(store_name)
if success:
return jsonify({
"success": True,
"message": f"'{store_name}' pushed to GitHub",
"records": db.count(store_name)
})
else:
return jsonify({
"success": False,
"error": error
}), 500
@app.route('/restore_db', methods=['GET', 'POST'])
def restore_db():
"""Pull all data from GitHub into memory."""
if not _check_admin_auth():
return jsonify({"error": "Unauthorized"}), 401
success, error = db.pull_from_github()
if success:
return jsonify({
"success": True,
"message": "All databases restored from GitHub into memory",
"stats": db.get_stats()
})
else:
return jsonify({
"success": False,
"message": "Failed to restore from GitHub",
"error": error,
"stats": db.get_stats()
}), 500
@app.route('/db_stats', methods=['GET'])
def db_stats():
"""Get database statistics."""
if not _check_admin_auth():
return jsonify({"error": "Unauthorized"}), 401
stats = db.get_stats()
try:
from github_storage import get_github_storage
gh = get_github_storage()
stats["github"] = gh.get_status()
except Exception as e:
stats["github"] = {"error": str(e)}
return jsonify(stats)
@app.route('/github_status', methods=['GET'])
def github_status():
"""Check GitHub storage status and API rate limits."""
if not _check_admin_auth():
return jsonify({"error": "Unauthorized"}), 401
try:
from github_storage import get_github_storage
gh = get_github_storage()
return jsonify(gh.get_status())
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint. No auth required."""
stats = {}
for store_name in db.STORES:
stats[store_name] = db.count(store_name)
return jsonify({
"status": "healthy",
"records": stats,
"version": "2.0-github-only",
"persistence": "github",
"local_files": False
})
@app.route('/backup_status', methods=['GET'])
def backup_status():
"""Detailed backup status. No auth required."""
files_status = {}
for store_name in db.STORES:
files_status[store_name] = {
"github_file": db.STORE_FILES.get(store_name, ""),
"records_in_memory": db.count(store_name)
}
github_configured = False
github_rate = None
try:
from github_storage import get_github_storage
gh = get_github_storage()
github_configured = gh._configured
if github_configured:
status = gh.get_status()
github_rate = status.get("rate_limit")
except Exception:
pass
return jsonify({
"status": "ok",
"github_configured": github_configured,
"github_rate_limit": github_rate,
"stores": files_status,
"stats": db.get_stats()
})
# โ”€โ”€โ”€ Graceful shutdown โ”€โ”€โ”€
def _on_shutdown():
print("\n๐Ÿ›‘ Application shutting down...")
db.shutdown()
print("โš ๏ธ Data is in memory only! Push to GitHub: /backup_db")
atexit.register(_on_shutdown)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# MAIN
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
if __name__ == '__main__':
print("\n" + "โ•" * 60)
print(" ๐ŸŽ“ CORVO AI - High Performance Teaching Platform")
print("โ•" * 60)
print()
print(" ๐Ÿ“‹ Architecture:")
print(" โ”œโ”€โ”€ memory_db โ†’ Pure RAM database")
print(" โ”œโ”€โ”€ github_storage โ†’ GitHub persistence (manual)")
print(" โ”œโ”€โ”€ http_pool โ†’ Connection pooling for APIs")
print(" โ”œโ”€โ”€ auth/ โ†’ Login, signup, sessions")
print(" โ”œโ”€โ”€ chat/ โ†’ Text AI chat")
print(" โ”œโ”€โ”€ board/ โ†’ Interactive whiteboard")
print(" โ”œโ”€โ”€ exam/ โ†’ MCQ exam generation")
print(" โ”œโ”€โ”€ media/ โ†’ Voice & image processing")
print(" โ”œโ”€โ”€ market/ โ†’ Subject purchasing & cards")
print(" โ”œโ”€โ”€ websocket/ โ†’ Real-time sessions")
print(" โ””โ”€โ”€ subjects/ โ†’ Subject definitions")
print()
print(" ๐Ÿ’พ STORAGE:")
print(" โœ… All data in RAM (zero disk I/O)")
print(" โœ… GitHub backup (manual via /backup_db)")
print(" โœ… NO local JSON files needed")
print(" โœ… Loads from GitHub on startup")
print()
print(" ๐Ÿ”’ ADMIN ENDPOINTS:")
print(f" GET /health โ†’ Health check")
print(f" GET /backup_status โ†’ Backup info")
print(f" GET /backup_db โ†’ Push ALL to GitHub")
print(f" GET /backup_db_single โ†’ Push ONE store")
print(f" GET /restore_db โ†’ Pull ALL from GitHub")
print(f" GET /db_stats โ†’ Database stats")
print(f" GET /github_status โ†’ GitHub API info")
print(f" (admin routes need ?secret={ADMIN_SECRET})")
print()
print(f" ๐ŸŒ Open: http://localhost:{PORT}")
print("โ•" * 60 + "\n")
socketio.run(app, debug=DEBUG, host=HOST, port=PORT)
================================================================================
FILE 2: auth\__init__.py
FULL PATH: main_project\auth\__init__.py
================================================================================
"""
Authentication package.
"""
================================================================================
FILE 3: auth\helpers.py
FULL PATH: main_project\auth\helpers.py
================================================================================
"""
Session and authentication helpers.
Updated to use MemoryDB for fast lookups.
"""
import uuid
from flask import session
from memory_db import get_db
# Active WebSocket connections per user
active_connections = {}
def generate_session_id():
return str(uuid.uuid4())
def is_session_valid():
if 'username' not in session or 'session_id' not in session:
return False
username = session['username']
session_id = session['session_id']
db = get_db()
user_data = db.read_key('users', username)
if not user_data:
return False
if user_data.get('session_id') != session_id:
return False
return True
def notify_logout(username, socketio, exclude_sid=None):
"""Notify all connections for a user to logout."""
if username in active_connections:
for sid in active_connections[username]:
if sid != exclude_sid:
socketio.emit('force_logout', {
'message': 'ู„ู‚ุฏ ุชู… ุชุณุฌูŠู„ ุฎุฑูˆุฌูƒ ู„ุฃู† ุดุฎุตุงู‹ ุขุฎุฑ ู‚ุงู… ุจุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„.'
}, room=sid)
================================================================================
FILE 4: auth\routes.py
FULL PATH: main_project\auth\routes.py
================================================================================
"""
Authentication routes: signup, login, logout, verification.
Updated to use MemoryDB for atomic operations.
"""
from flask import Blueprint, render_template, request, redirect, url_for, session, flash
from datetime import datetime
from auth.helpers import is_session_valid, generate_session_id, notify_logout, active_connections
from memory_db import get_db
from database.telegram import load_telegram_db, save_telegram_db
auth_bp = Blueprint('auth', __name__)
_socketio = None
def init_auth_socketio(socketio):
global _socketio
_socketio = socketio
@auth_bp.route('/signup', methods=['GET', 'POST'])
def signup():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
student_type = request.form.get('student_type', '').strip()
if not username or not password or not confirm_password:
flash('ุฌู…ูŠุน ุงู„ุญู‚ูˆู„ ู…ุทู„ูˆุจุฉ!', 'error')
return render_template('signup.html')
if not student_type or student_type not in ['ุฃุฏุจูŠ', 'ุนู„ู…ูŠ']:
flash('ูŠุฑุฌู‰ ุงุฎุชูŠุงุฑ ู†ูˆุน ุงู„ุฏุฑุงุณุฉ!', 'error')
return render_template('signup.html')
if len(username) < 3:
flash('ุงุณู… ุงู„ู…ุณุชุฎุฏู… ู‚ุตูŠุฑ ุฌุฏุงู‹!', 'error')
return render_template('signup.html')
if password != confirm_password:
flash('ูƒู„ู…ุงุช ุงู„ู…ุฑูˆุฑ ุบูŠุฑ ู…ุชุทุงุจู‚ุฉ!', 'error')
return render_template('signup.html')
if len(password) < 6:
flash('ูƒู„ู…ุฉ ุงู„ู…ุฑูˆุฑ ู‚ุตูŠุฑุฉ ุฌุฏุงู‹ (6 ุฃุญุฑู ุนู„ู‰ ุงู„ุฃู‚ู„)!', 'error')
return render_template('signup.html')
db = get_db()
if db.has_key('users', username):
flash('ุงุณู… ุงู„ู…ุณุชุฎุฏู… ู…ูˆุฌูˆุฏ ู…ุณุจู‚ุงู‹!', 'error')
return render_template('signup.html')
session['temp_username'] = username
session['temp_password'] = password
session['temp_student_type'] = student_type
return redirect(url_for('auth.verification'))
return render_template('signup.html')
@auth_bp.route('/verification', methods=['GET', 'POST'])
def verification():
if 'temp_username' not in session:
flash('ูŠุฑุฌู‰ ุจุฏุก ุนู…ู„ูŠุฉ ุงู„ุชุณุฌูŠู„ ู…ู† ุงู„ุจุฏุงูŠุฉ.', 'error')
return redirect(url_for('auth.signup'))
if request.method == 'POST':
code = request.form.get('code', '').strip()
if not code:
flash('ูŠุฑุฌู‰ ุฅุฏุฎุงู„ ุฑู…ุฒ ุงู„ุชุญู‚ู‚!', 'error')
return render_template('verification.html')
db = get_db()
telegram_db = db.read('telegram')
user_found = False
user_id = None
for uid, user_data in telegram_db.items():
if user_data.get('code') == code and user_data.get('code') != "DONE":
user_found = True
user_id = uid
break
if not user_found:
flash('ุฑู…ุฒ ุงู„ุชุญู‚ู‚ ุบูŠุฑ ุตุญูŠุญ ุฃูˆ ุชู… ุงุณุชุฎุฏุงู…ู‡!', 'error')
return render_template('verification.html')
# Update telegram record atomically
db.update_key('telegram', user_id, lambda t: {
**(t or {}),
'code': "DONE",
'status': "verified",
'verified_at': datetime.now().isoformat()
})
new_session_id = generate_session_id()
username = session['temp_username']
password = session['temp_password']
student_type = session['temp_student_type']
# Create user atomically
db.write('users', username, {
"username": username,
"password": password,
"student_type": student_type,
"telegram_user_id": user_id,
"session_id": new_session_id,
"created_at": datetime.now().isoformat(),
"verified": True,
"last_login": datetime.now().isoformat(),
"balance": 0,
"purchased_subjects": ['islamic']
})
session.pop('temp_username', None)
session.pop('temp_password', None)
session.pop('temp_student_type', None)
session['username'] = username
session['user_id'] = user_id
session['session_id'] = new_session_id
flash('ุชู… ุฅู†ุดุงุก ุงู„ุญุณุงุจ ูˆุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ุจู†ุฌุงุญ!', 'success')
return redirect(url_for('dashboard'))
return render_template('verification.html')
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
db = get_db()
user_data = db.read_key('users', username)
if user_data and user_data['password'] == password:
new_session_id = generate_session_id()
if _socketio:
notify_logout(username, _socketio)
# Atomic update
db.update_key('users', username, lambda u: {
**(u or {}),
'session_id': new_session_id,
'last_login': datetime.now().isoformat()
})
session['username'] = username
session['user_id'] = user_data.get('telegram_user_id')
session['session_id'] = new_session_id
flash('ุชู… ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ุจู†ุฌุงุญ!', 'success')
return redirect(url_for('dashboard'))
else:
flash('ุงุณู… ุงู„ู…ุณุชุฎุฏู… ุฃูˆ ูƒู„ู…ุฉ ุงู„ู…ุฑูˆุฑ ุฎุทุฃ!', 'error')
return render_template('login.html')
@auth_bp.route('/logout')
def logout():
if 'username' in session:
username = session['username']
if username in active_connections:
active_connections[username] = []
db = get_db()
db.update_key('users', username, lambda u: {
**(u or {}),
'session_id': None
})
session.clear()
flash('ุชู… ุชุณุฌูŠู„ ุงู„ุฎุฑูˆุฌ.', 'success')
return redirect(url_for('auth.login'))
@auth_bp.route('/forgot-password')
def forgot_password():
return render_template('forgot_password.html')
================================================================================
FILE 5: board\__init__.py
FULL PATH: main_project\board\__init__.py
================================================================================
"""
Board package.
Interactive whiteboard with AI, TTS, icons, and page images.
"""
================================================================================
FILE 6: board\engine.py
FULL PATH: main_project\board\engine.py
================================================================================
"""
Board AI Engine - STREAMING VERSION.
Updated to use HTTP connection pools for 200 concurrent users.
"""
import re
import json
from config import GPT_URL, MAX_CHAT_HISTORY, GPT_TIMEOUT
from http_pool import gpt_session
from board.tts import TTSEngine
from board.icons import IconResolver
from board.pages import resolve_page_tags
from subjects.loader import subject_loader
from json_processor import BoardProcessor
class BoardEngine:
"""
Board AI Engine with streaming segment delivery.
Each board+voice pair is processed and yielded independently.
"""
def __init__(self):
self.gpt_url = GPT_URL
self.board_processor = BoardProcessor()
self.tts_engine = TTSEngine()
self.icon_resolver = IconResolver()
# Per-user state
self._user_sessions = {}
self._sessions_lock = __import__('threading').Lock()
print("โœ… BoardEngine initialized (streaming mode + connection pooling)")
# โ”€โ”€โ”€ User session management (thread-safe) โ”€โ”€โ”€
def _get_user_session(self, username):
with self._sessions_lock:
if username not in self._user_sessions:
self._user_sessions[username] = {
"subject_id": None,
"conversation_history": [],
"last_sequence": [],
}
return self._user_sessions[username]
def set_subject(self, username, subject_id):
with self._sessions_lock:
if username not in self._user_sessions:
self._user_sessions[username] = {
"subject_id": None,
"conversation_history": [],
"last_sequence": [],
}
us = self._user_sessions[username]
if us["subject_id"] and us["subject_id"] != subject_id:
us["conversation_history"] = []
us["last_sequence"] = []
print(f" ๐Ÿ”„ Board subject switched: {us['subject_id']} โ†’ {subject_id} for {username}")
us["subject_id"] = subject_id
print(f" ๐Ÿ“‹ Board subject set: {subject_id} for {username}")
def get_subject(self, username):
with self._sessions_lock:
us = self._user_sessions.get(username, {})
return us.get("subject_id")
# โ”€โ”€โ”€ GPT call (uses connection pool) โ”€โ”€โ”€
def _call_gpt5(self, user_message, system_prompt, temperature=0.7, max_tokens=4000):
payload = {
"user_input": user_message,
"chat_history": [
{"role": "system", "content": system_prompt}
],
"temperature": temperature,
"top_p": 0.95,
"max_completion_tokens": max_tokens
}
try:
response = gpt_session.post(self.gpt_url, json=payload, timeout=GPT_TIMEOUT)
response.raise_for_status()
return response.json().get("assistant_response", "")
except Exception as e:
print(f" โŒ GPT error: {e}")
return None
# โ”€โ”€โ”€ Chat history formatting โ”€โ”€โ”€
def _format_chat_history(self, username):
us = self._get_user_session(username)
history = us.get("conversation_history", [])
if not history:
return ""
recent = history[-10:]
parts = []
for msg in recent:
if msg["role"] == "user":
parts.append(f"ุงู„ุทุงู„ุจ: {msg['content']}")
elif msg["role"] == "assistant":
parts.append(f"ุงู„ู…ุฏุฑุณ: {msg['content']}")
return (
"\n\nโ•โ•โ• ุณุฌู„ ุงู„ู…ุญุงุฏุซุฉ ุงู„ุณุงุจู‚ุฉ โ•โ•โ•\n"
+ "\n".join(parts)
+ "\nโ•โ•โ• ู†ู‡ุงูŠุฉ ุงู„ุณุฌู„ โ•โ•โ•"
)
# โ”€โ”€โ”€ Step 1: Route message โ”€โ”€โ”€
def _route_message(self, user_message, username):
us = self._get_user_session(username)
subject_id = us["subject_id"]
if not subject_id:
return "main.txt"
subject_data = subject_loader.load(subject_id)
if not subject_data:
return "main.txt"
structure = subject_data.get("structure.txt", "")
p_files = subject_data.get("_p_files", [])
chat_history_text = self._format_chat_history(username)
p_files_desc = "\n".join([f"- {f}: ุงู„ูุตู„ {i+1}" for i, f in enumerate(p_files)])
routing_prompt = f"""ุฃู†ุช ู†ุธุงู… ุชูˆุฌูŠู‡ ุฐูƒูŠ ู„ู…ุณุงุนุฏ ุชุนู„ูŠู…ูŠ.
ู…ู‡ู…ุชูƒ: ุชุญู„ูŠู„ ุฑุณุงู„ุฉ ุงู„ุทุงู„ุจ ูˆุงุฎุชูŠุงุฑ ุงู„ู…ู„ู ุงู„ู…ู†ุงุณุจ ู„ู„ุฑุฏ.
ุงู„ู…ู„ูุงุช ุงู„ู…ุชุงุญุฉ:
- main.txt: ู„ู„ุชุญูŠุงุชุŒ ุงู„ุฃุณุฆู„ุฉ ุงู„ุนุงู…ุฉุŒ ุฃูŠ ุดูŠุก ู„ุง ูŠุชุนู„ู‚ ุจูุตู„ ู…ุญุฏุฏ
{p_files_desc}
ูู‡ุฑุณ ุงู„ูƒุชุงุจ (ู„ู„ู…ุณุงุนุฏุฉ ููŠ ุงู„ุชูˆุฌูŠู‡):
{structure}
{chat_history_text}
ุชุนู„ูŠู…ุงุช:
1. ุฅุฐุง ูƒุงู†ุช ุงู„ุฑุณุงู„ุฉ ุชุญูŠุฉ ุฃูˆ ุณุคุงู„ ุนุงู… โ†’ main.txt
2. ุฅุฐุง ูƒุงู†ุช ุชุณุฃู„ ุนู† ู…ูˆุถูˆุน ููŠ ูุตู„ ู…ุญุฏุฏ โ†’ ุงู„ู…ู„ู ุงู„ู…ู†ุงุณุจ
3. ุงุณุชุฎุฏู… ุงู„ูู‡ุฑุณ ู„ุชุญุฏูŠุฏ ุงู„ูุตู„ ุงู„ุตุญูŠุญ
4. ู…ู‡ู… ุฌุฏุงู‹: ุฅุฐุง ู‚ุงู„ ุงู„ุทุงู„ุจ "ุงุดุฑุญ ุฃูƒุซุฑ" ุฃูˆ "ูˆุถุญ" ุฃูˆ ุฃูŠ ุทู„ุจ ู…ุชุงุจุนุฉุŒ ุงุฑุฌุน ู„ุณุฌู„ ุงู„ู…ุญุงุฏุซุฉ ู„ุชุนุฑู ุงู„ู…ูˆุถูˆุน ุงู„ุญุงู„ูŠ ูˆุงุฎุชุฑ ู†ูุณ ุงู„ู…ู„ู
5. ุฃุฌุจ ูู‚ุท ุจุงุณู… ุงู„ู…ู„ู (ู…ุซุงู„: p1.txt ุฃูˆ main.txt) ุจุฏูˆู† ุฃูŠ ูƒู„ุงู… ุฅุถุงููŠ
ุฑุณุงู„ุฉ ุงู„ุทุงู„ุจ: {user_message}
ุงู„ู…ู„ู ุงู„ู…ู†ุงุณุจ:"""
chosen = self._call_gpt5(
user_message, routing_prompt,
temperature=0.2, max_tokens=50
)
if not chosen:
return "main.txt"
chosen = chosen.strip().lower()
valid_files = ["main.txt"] + p_files
for v in valid_files:
if v in chosen:
return v
return "main.txt"
# โ”€โ”€โ”€ Step 2: Generate XML response โ”€โ”€โ”€
def _generate_xml_response(self, user_message, chosen_file, username):
us = self._get_user_session(username)
subject_id = us["subject_id"]
if not subject_id:
return None
subject_data = subject_loader.load(subject_id)
if not subject_data:
return None
file_content = subject_data.get(chosen_file, "")
chat_history_text = self._format_chat_history(username)
system_prompt = f"""ุงู†ุชูŠ ู…ุฏุฑุณุฉ ุฎุจูŠุฑุฉ ูˆู…ุญุชุฑูุฉ ููŠ ู‡ุฐู‡ ุงู„ู…ุงุฏุฉ. ุชุดุฑุญ ุนู„ู‰ ุณุจูˆุฑุฉ ุชูุงุนู„ูŠุฉ ุฑู‚ู…ูŠุฉ.
โ•โ•โ• ุตูŠุบุฉ ุงู„ุฑุฏ โ•โ•โ•
ูŠุฌุจ ุฃู† ุชุฑุฏูŠ ุจุตูŠุบุฉ XML ุฎุงุตุฉ ุชุญุชูˆูŠ ุนู„ู‰:
1. <board>ุนู†ุงุตุฑ ุงู„ุณุจูˆุฑุฉ</board> - ู…ุง ุณูŠูุฑุณู…/ูŠูุถุงู ุนู„ู‰ ุงู„ุณุจูˆุฑุฉ ุฃูˆู„ุงู‹
2. <voice>ู†ุต ุงู„ูƒู„ุงู…</voice> - ุงู„ู†ุต ุงู„ุฐูŠ ุณูŠูู‚ุฑุฃ ุจุตูˆุช ุนุงู„ู ู„ู„ุทุงู„ุจ ุจุนุฏ ุฑุณู… ุงู„ุนู†ุงุตุฑ (ุนุฑุจูŠ ุทุจูŠุนูŠ)
โ•โ•โ• ุนู†ุงุตุฑ ุงู„ุณุจูˆุฑุฉ ุงู„ู…ุชุงุญุฉ (ุฏุงุฎู„ <board>) โ•โ•โ•
โ€ข <note>ู…ุญุชูˆู‰ ุงู„ู…ู„ุงุญุธุฉ</note>
โ€ข <text>ู†ุต ู…ุจุงุดุฑ ุนู„ู‰ ุงู„ุณุจูˆุฑุฉ ุจุฏูˆู† ุฎู„ููŠุฉ</text>
โ€ข <shape type="ู†ูˆุน_ุงู„ุดูƒู„"/>
ุงู„ุฃู†ูˆุงุน: circle, triangle, star, arrow-right, arrow-left, arrow-up, arrow-down,
rectangle, diamond, hexagon, square, oval, arrow-double-h, checkmark, cross,
heart, cloud, lightning, speech, process, decision
โ€ข <svg>ูƒู„ู…ุฉ_ุจุญุซ_ุจุงู„ุฅู†ุฌู„ูŠุฒูŠุฉ</svg>
ุณูŠุชู… ุงู„ุจุญุซ ุนู† ุฃูŠู‚ูˆู†ุฉ ู…ุฑุณูˆู…ุฉ ูŠุฏูˆูŠุงู‹ (ู…ุซู„: ball, car, force, spring, weight, rope, pulley)
โ€ข <page>ุฑู‚ู…_ุงู„ุตูุญุฉ</page>
ู„ุนุฑุถ ุตูุญุฉ ู…ุญุฏุฏุฉ ู…ู† ุงู„ูƒุชุงุจ ูƒุตูˆุฑุฉ ุนู„ู‰ ุงู„ุณุจูˆุฑุฉ
ู…ุซุงู„: <page>12</page> ู„ุนุฑุถ ุงู„ุตูุญุฉ 12 ู…ู† ุงู„ูƒุชุงุจ
ุงุณุชุฎุฏู…ู‡ุง ุนู†ุฏู…ุง ุชุญุชุงุฌ ุชุนุฑุถ ู„ู„ุทุงู„ุจ ุตูุญุฉ ู…ุนูŠู†ุฉ ู…ู† ุงู„ูƒุชุงุจ
โ•โ•โ• ู‚ูˆุงุนุฏ ู…ู‡ู…ุฉ ุฌุฏุงู‹ โ•โ•โ•
1. ุงุดุฑุญ ุฎุทูˆุฉ ุจุฎุทูˆุฉ: ุงุจุฏุฃ ุจู€ <board> ุซู… <voice> ุซู… <board> ุซู… <voice> ูˆู‡ูƒุฐุง
2. ุงุฌุนู„ ุงู„ุดุฑุญ ู…ุชุฏุฑุฌุงู‹ ูƒุฃู†ูƒ ุชุดุฑุญ ุนู„ู‰ ุณุจูˆุฑุฉ ุญู‚ูŠู‚ูŠุฉ ุฃู…ุงู… ุงู„ุทู„ุงุจ
3. <board> = ู…ุง ูŠุธู‡ุฑ ุนู„ู‰ ุงู„ุณุจูˆุฑุฉ ุฃูˆู„ุงู‹ (ู…ู„ุงุญุธุงุชุŒ ู†ุตูˆุตุŒ ุฃุดูƒุงู„ุŒ ุตูˆุฑุŒ ุตูุญุงุช ุงู„ูƒุชุงุจ)
4. <voice> = ุงู„ูƒู„ุงู… ุงู„ู…ุณู…ูˆุน ุจุนุฏ ุฑุณู… ุงู„ุนู†ุงุตุฑ (ุทุจูŠุนูŠุŒ ูˆุฏูˆุฏุŒ ูˆุงุถุญุŒ ูŠุดุฑุญ ู…ุง ุชู… ุฑุณู…ู‡)
5. ู„ุง ุชุถุน ูƒู„ ุดูŠุก ุฏูุนุฉ ูˆุงุญุฏุฉ - ุงุฌุนู„ู‡ ุชุณู„ุณู„ูŠุงู‹
7. <svg> ูู‚ุท ุจูƒู„ู…ุงุช ุฅู†ุฌู„ูŠุฒูŠุฉ ุจุณูŠุทุฉ ูˆู…ุนุจุฑุฉ
8. ุงู„ุณุจูˆุฑุฉ ุชุนู…ู„ ุจู†ุธุงู… ุงู„ุฅุถุงูุฉ - ุงู„ุนู†ุงุตุฑ ุงู„ุณุงุจู‚ุฉ ุชุจู‚ู‰
9. ุงุณุชุฎุฏู… <text> ู„ู„ุนู†ุงูˆูŠู† ูˆุงู„ู…ุนุงุฏู„ุงุช ุงู„ู…ู‡ู…ุฉ (ุจุฏูˆู† ุฎู„ููŠุฉ)
10. ุงุณุชุฎุฏู… <note> ู„ู„ุชูˆุถูŠุญุงุช ูˆุงู„ู…ู„ุงุญุธุงุช (ู…ุน ุฎู„ููŠุฉ ู…ู„ูˆู†ุฉ)
11. ู„ุง ุชุณุชุฎุฏู… ุฃูƒุซุฑ ู…ู† 3-4 ุนู†ุงุตุฑ ููŠ ูƒู„ <board>
12. ุงุฌุนู„ ุงู„ู†ุต ููŠ <voice> ุทุจูŠุนูŠุงู‹ ูƒุฃู†ูƒ ุชุชุญุฏุซ ู…ุน ุทุงู„ุจ ูˆูŠุดุฑุญ ู…ุง ุชู… ุฑุณู…ู‡ ุนู„ู‰ ุงู„ุณุจูˆุฑุฉ
13. ุงุฑุณู… ุฃูˆู„ุงู‹ ุซู… ุชูƒู„ู… - ู‡ุฐุง ู…ู‡ู… ุฌุฏุงู‹!
14. ุฑุงุฌุน ุณุฌู„ ุงู„ู…ุญุงุฏุซุฉ ุงู„ุณุงุจู‚ุฉ ู„ุชุนุฑู ู…ุง ุชู… ุดุฑุญู‡ ูˆุชูƒู…ู„ ู…ู† ุญูŠุซ ุชูˆู‚ูุช - ู„ุง ุชูƒุฑุฑ ู…ุง ู‚ู„ุชู‡ ุณุงุจู‚ุงู‹
15. ุงุณุชุฎุฏู… <page> ุนู†ุฏู…ุง ุชุฑูŠุฏ ุนุฑุถ ุตูุญุฉ ู…ู† ุงู„ูƒุชุงุจ - ู…ุซู„ุงู‹ ุฅุฐุง ุงู„ุทุงู„ุจ ุณุฃู„ ุนู† ุชู…ุฑูŠู† ุฃูˆ ุดูƒู„ ููŠ ุตูุญุฉ ู…ุนูŠู†ุฉ
when user talk about something not about the subject or something funny etc... you can actually answer without the board just VOICE and be funny smart perfect girl also:
when you explain something dont make all your explain on the NOTE make the note for important point use the TEXT direct on the board and the ICONS/SHAPES
โ•โ•โ• ู…ุญุชูˆู‰ ุงู„ู…ุงุฏุฉ โ•โ•โ•
{file_content}
{chat_history_text}
โ•โ•โ• ุงู„ุขู† ุฃุฌุจ ุนู„ู‰ ุณุคุงู„ ุงู„ุทุงู„ุจ โ•โ•โ•
ุฑุณุงู„ุฉ ุงู„ุทุงู„ุจ: {user_message}"""
response = self._call_gpt5(
user_message, system_prompt,
temperature=0.8, max_tokens=4000
)
return response
# โ”€โ”€โ”€ Resolve tags โ”€โ”€โ”€
def _resolve_svg_tags(self, xml_text):
if not xml_text:
return xml_text
return self.icon_resolver.resolve_all_in_xml(xml_text)
def _resolve_page_tags(self, xml_text, username):
if not xml_text:
return xml_text
us = self._get_user_session(username)
subject_id = us.get("subject_id")
if not subject_id:
return xml_text
base_url = subject_loader.get_pages_base_url(subject_id)
if not base_url:
print(f" โš ๏ธ No pages_base_url for subject {subject_id}")
return xml_text
return resolve_page_tags(xml_text, base_url)
# โ”€โ”€โ”€ Parse XML into raw segments โ”€โ”€โ”€
def _parse_xml_to_raw_segments(self, xml_response):
if not xml_response:
return []
pattern = r'<(voice|board)>(.*?)</\1>'
matches = list(re.finditer(pattern, xml_response, re.DOTALL))
if not matches:
cleaned = re.sub(r'<[^>]+>', '', xml_response).strip()
if cleaned:
return [{"boards": [], "voice": cleaned}]
return []
groups = []
current_boards = []
for match in matches:
tag_type = match.group(1)
content = match.group(2).strip()
if tag_type == "board":
current_boards.append(content)
elif tag_type == "voice":
cleaned_voice = re.sub(r'<[^>]+>', '', content).strip()
groups.append({
"boards": list(current_boards),
"voice": cleaned_voice
})
current_boards = []
if current_boards:
groups.append({
"boards": list(current_boards),
"voice": ""
})
return groups
# โ”€โ”€โ”€ Process a single board content into items โ”€โ”€โ”€
def _process_single_board(self, board_content, current_board_state):
existing_json_str = json.dumps(
current_board_state, ensure_ascii=False, indent=2
)
processor_input = (
f"BOARD NOW (make sure no X Y error):\n"
f"{existing_json_str}\n\n"
f"new board need to add :\n"
f"<board>{board_content}</board>"
)
print(f" ๐Ÿ”ง Sending to json_processor...")
print(f" Current board items: {len(current_board_state)}")
try:
json_text = self.board_processor.convert_xml_to_json(processor_input)
if json_text:
new_items = json.loads(json_text)
if isinstance(new_items, list) and new_items:
added_items = []
existing_ids = set()
for item in current_board_state:
item_key = json.dumps(item, sort_keys=True, ensure_ascii=False)
existing_ids.add(item_key)
for item in new_items:
item_key = json.dumps(item, sort_keys=True, ensure_ascii=False)
if item_key not in existing_ids:
added_items.append(item)
current_board_state.extend(added_items)
print(f" โœ… json_processor: {len(new_items)} total, {len(added_items)} new")
return added_items, current_board_state
elif isinstance(new_items, dict):
current_board_state.append(new_items)
print(f" โœ… json_processor: 1 item")
return [new_items], current_board_state
else:
print(f" โš ๏ธ json_processor: unexpected format")
return [], current_board_state
except json.JSONDecodeError as e:
print(f" โŒ json_processor invalid JSON: {e}")
return [], current_board_state
except Exception as e:
print(f" โŒ json_processor error: {e}")
return [], current_board_state
return [], current_board_state
# โ”€โ”€โ”€ STREAMING PIPELINE (generator) โ”€โ”€โ”€
def process_message_stream(self, user_message, username, frontend_board_state=None):
us = self._get_user_session(username)
subject_id = us.get("subject_id")
print(f"\n{'โ•' * 60}")
print(f" ๐Ÿ‘ค Student ({username}): {user_message}")
print(f" ๐Ÿ“š Subject: {subject_id}")
print(f"{'โ•' * 60}")
if not subject_id:
yield json.dumps({
"type": "error",
"message": "ู„ู… ูŠุชู… ุชุญุฏูŠุฏ ุงู„ู…ุงุฏุฉ ู„ู„ุณุจูˆุฑุฉ"
}, ensure_ascii=False)
return
if frontend_board_state is None:
frontend_board_state = []
current_board_state = list(frontend_board_state)
print(f" ๐Ÿ“‹ Board state from frontend: {len(current_board_state)} items")
# Step 1: Route
print("\n ๐Ÿ“ Step 1: Routing message...")
chosen_file = self._route_message(user_message, username)
print(f" ๐Ÿ“‚ Chosen file: {chosen_file}")
# Step 2: Generate XML
print(f" ๐Ÿค– Step 2: Generating XML response...")
xml_response = self._generate_xml_response(user_message, chosen_file, username)
if not xml_response:
print(" โŒ Failed to generate response")
yield json.dumps({
"type": "error",
"message": "ุนุฐุฑุงู‹ุŒ ุญุฏุซ ุฎุทุฃ ููŠ ุงู„ู†ุธุงู…. ุญุงูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰."
}, ensure_ascii=False)
return
print(f" ๐Ÿ“ XML response: {len(xml_response)} chars")
# Step 3: Resolve <page> tags
print(" ๐Ÿ“„ Step 3: Resolving <page> tags...")
xml_response = self._resolve_page_tags(xml_response, username)
# Step 4: Resolve <svg> tags
print(" ๐ŸŽจ Step 4: Resolving <svg> tags...")
xml_response = self._resolve_svg_tags(xml_response)
# Step 5: Parse XML into segment groups
print(" ๐Ÿ”ง Step 5: Parsing XML into segment groups...")
raw_groups = self._parse_xml_to_raw_segments(xml_response)
total_groups = len(raw_groups)
print(f" ๐Ÿ“Š Found {total_groups} segment groups to stream")
if total_groups == 0:
yield json.dumps({
"type": "error",
"message": "ู„ู… ูŠุชู… ุชูˆู„ูŠุฏ ู…ุญุชูˆู‰ ู„ู„ุณุจูˆุฑุฉ."
}, ensure_ascii=False)
return
# Step 6: Process and yield each group one by one
all_voice_texts = []
all_segments_for_replay = []
for idx, group in enumerate(raw_groups):
print(f"\n โ”€โ”€ Segment {idx + 1}/{total_groups} โ”€โ”€")
segment_board_items = []
for board_content in group["boards"]:
new_items, current_board_state = self._process_single_board(
board_content, current_board_state
)
segment_board_items.extend(new_items)
voice_text = group.get("voice", "")
audio_url = None
if voice_text:
all_voice_texts.append(voice_text)
text_preview = voice_text[:50]
print(f" ๐ŸŽ™๏ธ Converting TTS: '{text_preview}...'")
audio_file = self.tts_engine.convert(voice_text)
if audio_file:
audio_url = f"/static/{audio_file}"
else:
print(f" โš ๏ธ TTS failed for this segment")
segment_data = {
"type": "segment",
"index": idx,
"total_estimate": total_groups,
"board_items": segment_board_items,
"voice_text": voice_text,
"audio_url": audio_url
}
all_segments_for_replay.append(segment_data)
print(f" โœ… Segment {idx + 1} ready: {len(segment_board_items)} board items, voice={'yes' if voice_text else 'no'}")
yield json.dumps(segment_data, ensure_ascii=False)
# Step 7: Save chat history (thread-safe)
with self._sessions_lock:
us = self._user_sessions.get(username, {})
if "conversation_history" not in us:
us["conversation_history"] = []
us["conversation_history"].append({
"role": "user",
"content": user_message
})
assistant_text = " ".join(all_voice_texts)
if assistant_text:
us["conversation_history"].append({
"role": "assistant",
"content": assistant_text
})
if len(us["conversation_history"]) > MAX_CHAT_HISTORY:
us["conversation_history"] = us["conversation_history"][-MAX_CHAT_HISTORY:]
us["last_sequence"] = all_segments_for_replay
# Cleanup old audio
self.tts_engine.cleanup_old_files()
yield json.dumps({
"type": "done",
"board_state": current_board_state,
"chosen_file": chosen_file,
"total_segments": total_groups
}, ensure_ascii=False)
print(f"\n โœ… All {total_groups} segments streamed!")
print(f" Board items: {len(current_board_state)}")
print(f"{'โ•' * 60}\n")
# โ”€โ”€โ”€ NON-STREAMING (kept for compatibility) โ”€โ”€โ”€
def process_message(self, user_message, username, frontend_board_state=None):
all_segments = []
final_board_state = frontend_board_state or []
chosen_file = None
for chunk_str in self.process_message_stream(user_message, username, frontend_board_state):
chunk = json.loads(chunk_str)
if chunk["type"] == "segment":
if chunk.get("board_items"):
all_segments.append({
"type": "board_update",
"action": "add",
"items": chunk["board_items"]
})
if chunk.get("voice_text"):
all_segments.append({
"type": "voice",
"text": chunk["voice_text"],
"audio_url": chunk.get("audio_url")
})
elif chunk["type"] == "done":
final_board_state = chunk.get("board_state", [])
chosen_file = chunk.get("chosen_file")
elif chunk["type"] == "error":
return {
"success": False,
"error": chunk["message"],
"sequence": [{
"type": "voice",
"text": chunk["message"],
"audio_url": None
}],
"board_state": frontend_board_state or []
}
return {
"success": True,
"chosen_file": chosen_file,
"sequence": all_segments,
"board_state": final_board_state
}
# โ”€โ”€โ”€ Replay โ”€โ”€โ”€
def get_replay_sequence(self, username):
with self._sessions_lock:
us = self._user_sessions.get(username, {})
last_seq = us.get("last_sequence", [])
if not last_seq:
return {
"success": False,
"error": "No previous response to replay",
"sequence": []
}
voice_only = []
for item in last_seq:
if item.get("voice_text"):
voice_only.append({
"type": "voice",
"text": item.get("voice_text", ""),
"audio_url": item.get("audio_url")
})
print(f" ๐Ÿ”„ Replay: {len(voice_only)} voice segments")
return {
"success": True,
"sequence": voice_only
}
# โ”€โ”€โ”€ Clear โ”€โ”€โ”€
def clear_board(self, username):
with self._sessions_lock:
if username in self._user_sessions:
self._user_sessions[username]["last_sequence"] = []
print(f" ๐Ÿ—‘๏ธ Board cleared for {username}")
return {"success": True, "board_state": []}
def clear_chat_history(self, username):
with self._sessions_lock:
if username in self._user_sessions:
self._user_sessions[username]["conversation_history"] = []
self._user_sessions[username]["last_sequence"] = []
print(f" ๐Ÿ—‘๏ธ Board chat history cleared for {username}")
return {"success": True}
def clear_user_session(self, username):
with self._sessions_lock:
if username in self._user_sessions:
del self._user_sessions[username]
print(f" ๐Ÿ—‘๏ธ Full board session cleared for {username}")
return {"success": True}
================================================================================
FILE 7: board\icons.py
FULL PATH: main_project\board\icons.py
================================================================================
"""
Icon resolver using Icons8 API.
Updated to use HTTP connection pool + thread-safe cache.
"""
import re
import threading
import requests
from config import ICONS8_SEARCH_URL, ICONS8_IMAGE_URL
from http_pool import icons8_session
class IconResolver:
"""Resolves <svg>keyword</svg> tags to Icons8 hand-drawn icon URLs."""
def __init__(self):
self._cache = {}
self._cache_lock = threading.Lock()
print(" โœ“ IconResolver initialized (pooled + thread-safe cache)")
def resolve(self, search_term):
if not search_term or not search_term.strip():
return None
term = search_term.strip().lower()
# Thread-safe cache read
with self._cache_lock:
if term in self._cache:
cached = self._cache[term]
print(f" ๐ŸŽจ Icon cache hit: '{term}' โ†’ {cached}")
return cached
url = ICONS8_SEARCH_URL.format(term=requests.utils.quote(term))
try:
resp = icons8_session.get(url, timeout=15)
resp.raise_for_status()
data = resp.json()
if data.get("success") and data.get("icons"):
first_icon = data["icons"][0]
icon_id = first_icon["id"]
icon_name = first_icon.get("name", term)
image_url = ICONS8_IMAGE_URL.format(icon_id=icon_id)
with self._cache_lock:
self._cache[term] = image_url
print(f" ๐ŸŽจ Icon found: '{term}' โ†’ '{icon_name}' (id={icon_id})")
return image_url
else:
print(f" โš ๏ธ No icon found for: '{term}'")
with self._cache_lock:
self._cache[term] = None
return None
except Exception as e:
print(f" โŒ Icon search error for '{term}': {e}")
return None
def resolve_all_in_xml(self, xml_text):
"""Replace all <svg>keyword</svg> tags with <img> tags."""
if not xml_text:
return xml_text
pattern = r'<svg>(.*?)</svg>'
matches = list(re.finditer(pattern, xml_text, re.DOTALL))
if not matches:
return xml_text
print(f" ๐Ÿ“ Found {len(matches)} <svg> tag(s) to resolve...")
result = xml_text
for match in reversed(matches):
keyword = match.group(1).strip()
start = match.start()
end = match.end()
image_url = self.resolve(keyword)
if image_url:
replacement = (
f'<img src="{image_url}" alt="{keyword}" '
f'w="100" h="100"/>'
)
else:
replacement = (
f'<note color="white" w="120" h="80">'
f'๐Ÿ–ผ๏ธ {keyword}</note>'
)
result = result[:start] + replacement + result[end:]
return result
================================================================================
FILE 8: board\pages.py
FULL PATH: main_project\board\pages.py
================================================================================
"""
Page URL builder.
Converts <page>NUMBER</page> tags to real image URLs.
Each subject can have its own pages_base_url.
"""
import re
def build_page_url(page_number, base_url):
"""
Convert a page number to the full image URL using the subject's base URL.
base_url must contain {page_num} placeholder.
Example base_url:
https://huggingface.co/.../page_{page_num}.png
"""
if not base_url:
print(f" โŒ No pages_base_url configured")
return None
try:
num = int(str(page_number).strip())
page_str = str(num).zfill(4)
url = base_url.format(page_num=page_str)
print(f" ๐Ÿ“„ Page {num} โ†’ {url}")
return url
except (ValueError, TypeError):
print(f" โŒ Invalid page number: {page_number}")
return None
def resolve_page_tags(xml_text, base_url):
"""
Find all <page>NUMBER</page> tags in the XML and replace them
with <img> tags using the subject's base URL.
"""
if not xml_text:
return xml_text
pattern = r'<page>\s*(.*?)\s*</page>'
matches = list(re.finditer(pattern, xml_text, re.DOTALL))
if not matches:
return xml_text
print(f" ๐Ÿ“– Found {len(matches)} <page> tag(s) to resolve...")
result = xml_text
for match in reversed(matches):
page_num_raw = match.group(1).strip()
start = match.start()
end = match.end()
url = build_page_url(page_num_raw, base_url)
if url:
replacement = (
f'<img ubtype="upload" src="{url}" '
f'alt="ุตูุญุฉ {page_num_raw}" '
f'w="600" h="800"/>'
)
else:
replacement = (
f'<note color="white" w="200" h="80">'
f'๐Ÿ“„ ุตูุญุฉ {page_num_raw}</note>'
)
result = result[:start] + replacement + result[end:]
return result
================================================================================
FILE 9: board\routes.py
FULL PATH: main_project\board\routes.py
================================================================================
"""
Board API routes - STREAMING VERSION.
Updated to use MemoryDB for fast user lookups.
"""
from flask import (
Blueprint, request, jsonify, session, render_template,
redirect, url_for, flash, Response, stream_with_context
)
import json
from auth.helpers import is_session_valid
from memory_db import get_db
from subjects.access import validate_user_subject_access
from subjects.definitions import get_subject_name, is_board_enabled, SUBJECTS
from subjects.loader import subject_loader
board_bp = Blueprint('board', __name__)
_board_engine = None
def init_board_engine(engine):
global _board_engine
_board_engine = engine
@board_bp.route('/board/<subject_id>')
def board_page(subject_id):
"""Render the board page for a specific subject."""
if not is_session_valid():
session.clear()
flash('ุงู†ุชู‡ุช ุงู„ุฌู„ุณุฉุŒ ูŠุฑุฌู‰ ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰.', 'error')
return redirect(url_for('auth.login'))
username = session['username']
db = get_db()
user_data = db.read_key('users', username)
if not user_data or not user_data.get('verified', False):
flash('ุญุณุงุจูƒ ุบูŠุฑ ู…ูุนู‘ู„. ุชูˆุงุตู„ ู…ุน ุงู„ุฅุฏุงุฑุฉ.', 'error')
return redirect(url_for('dashboard'))
has_access, access_error = validate_user_subject_access(username, subject_id)
if not has_access:
flash(access_error, 'error')
return redirect(url_for('dashboard'))
if not is_board_enabled(subject_id):
flash('ู‡ุฐู‡ ุงู„ู…ุงุฏุฉ ู„ุง ุชุฏุนู… ุงู„ุณุจูˆุฑุฉ ุงู„ุชูุงุนู„ูŠุฉ ุญุงู„ูŠุงู‹.', 'error')
return redirect(url_for('dashboard'))
subject_data = subject_loader.load(subject_id)
if not subject_data:
flash(f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{subject_id}' ุบูŠุฑ ู…ูˆุฌูˆุฏ ุนู„ู‰ ุงู„ุฎุงุฏู….", 'error')
return redirect(url_for('dashboard'))
_board_engine.set_subject(username, subject_id)
student_type = user_data.get('student_type', 'ุนู„ู…ูŠ')
subject_name = get_subject_name(student_type, subject_id)
return render_template('board.html',
username=username,
subject_id=subject_id,
subject_name=subject_name)
@board_bp.route('/api/board/message', methods=['POST'])
def handle_board_message():
"""STREAMING board message endpoint using SSE."""
if not is_session_valid():
def auth_error():
yield f"data: {json.dumps({'type': 'error', 'message': 'ุบูŠุฑ ู…ุตุฑุญ'}, ensure_ascii=False)}\n\n"
return Response(auth_error(), mimetype='text/event-stream')
data = request.get_json()
if not data:
def no_data():
yield f"data: {json.dumps({'type': 'error', 'message': 'No JSON data'}, ensure_ascii=False)}\n\n"
return Response(no_data(), mimetype='text/event-stream')
username = session['username']
user_message = (data.get('message', '') or '').strip()
if not user_message:
def empty_msg():
yield f"data: {json.dumps({'type': 'error', 'message': 'Empty message'}, ensure_ascii=False)}\n\n"
return Response(empty_msg(), mimetype='text/event-stream')
db = get_db()
user = db.read_key('users', username)
if not user or not user.get('verified', False):
def unverified():
yield f"data: {json.dumps({'type': 'error', 'message': 'ุบูŠุฑ ู…ุตุฑุญ'}, ensure_ascii=False)}\n\n"
return Response(unverified(), mimetype='text/event-stream')
subject_id = data.get('subject_id', '').strip()
if subject_id:
has_access, access_error = validate_user_subject_access(username, subject_id)
if not has_access:
def no_access():
yield f"data: {json.dumps({'type': 'error', 'message': access_error}, ensure_ascii=False)}\n\n"
return Response(no_access(), mimetype='text/event-stream')
if not is_board_enabled(subject_id):
def no_board():
yield f"data: {json.dumps({'type': 'error', 'message': 'ู‡ุฐู‡ ุงู„ู…ุงุฏุฉ ู„ุง ุชุฏุนู… ุงู„ุณุจูˆุฑุฉ ุงู„ุชูุงุนู„ูŠุฉ'}, ensure_ascii=False)}\n\n"
return Response(no_board(), mimetype='text/event-stream')
_board_engine.set_subject(username, subject_id)
else:
subject_id = _board_engine.get_subject(username)
if not subject_id:
def need_subject():
yield f"data: {json.dumps({'type': 'error', 'message': 'ู„ู… ูŠุชู… ุชุญุฏูŠุฏ ุงู„ู…ุงุฏุฉ ู„ู„ุณุจูˆุฑุฉ'}, ensure_ascii=False)}\n\n"
return Response(need_subject(), mimetype='text/event-stream')
frontend_board_state = data.get('board_state', [])
if not isinstance(frontend_board_state, list):
frontend_board_state = []
print(f"\n ๐Ÿ“ฅ Board STREAM message from {username}:")
print(f" Subject: {subject_id}")
print(f" Message: {user_message[:80]}...")
print(f" Board state: {len(frontend_board_state)} items")
def generate():
try:
for chunk_json in _board_engine.process_message_stream(
user_message, username, frontend_board_state
):
yield f"data: {chunk_json}\n\n"
except Exception as e:
print(f"โŒ Error in board stream: {e}")
import traceback
traceback.print_exc()
error_data = json.dumps({
"type": "error",
"message": "ุนุฐุฑุงู‹ุŒ ุญุฏุซ ุฎุทุฃ ุบูŠุฑ ู…ุชูˆู‚ุน."
}, ensure_ascii=False)
yield f"data: {error_data}\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive'
}
)
@board_bp.route('/api/board/message-sync', methods=['POST'])
def handle_board_message_sync():
"""NON-STREAMING fallback endpoint."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
data = request.get_json()
if not data:
return jsonify({"error": "No JSON data"}), 400
username = session['username']
user_message = (data.get('message', '') or '').strip()
if not user_message:
return jsonify({"error": "Empty message"}), 400
db = get_db()
user = db.read_key('users', username)
if not user or not user.get('verified', False):
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 403
subject_id = data.get('subject_id', '').strip()
if subject_id:
has_access, access_error = validate_user_subject_access(username, subject_id)
if not has_access:
return jsonify({"error": access_error, "code": "no_access"}), 403
if not is_board_enabled(subject_id):
return jsonify({"error": "ู‡ุฐู‡ ุงู„ู…ุงุฏุฉ ู„ุง ุชุฏุนู… ุงู„ุณุจูˆุฑุฉ ุงู„ุชูุงุนู„ูŠุฉ"}), 400
_board_engine.set_subject(username, subject_id)
else:
subject_id = _board_engine.get_subject(username)
if not subject_id:
return jsonify({"error": "ู„ู… ูŠุชู… ุชุญุฏูŠุฏ ุงู„ู…ุงุฏุฉ ู„ู„ุณุจูˆุฑุฉ", "need_subject": True}), 400
frontend_board_state = data.get('board_state', [])
if not isinstance(frontend_board_state, list):
frontend_board_state = []
try:
result = _board_engine.process_message(
user_message, username, frontend_board_state
)
return jsonify(result)
except Exception as e:
print(f"โŒ Error processing board message: {e}")
import traceback
traceback.print_exc()
return jsonify({
"success": False,
"error": str(e),
"sequence": [{
"type": "voice",
"text": "ุนุฐุฑุงู‹ุŒ ุญุฏุซ ุฎุทุฃ ุบูŠุฑ ู…ุชูˆู‚ุน.",
"audio_url": None
}],
"board_state": frontend_board_state
}), 500
@board_bp.route('/api/board/replay', methods=['POST'])
def replay_last():
"""Replay the last response."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 401
username = session['username']
result = _board_engine.get_replay_sequence(username)
return jsonify(result)
@board_bp.route('/api/board/state', methods=['GET'])
def get_board_state():
"""Get board state (frontend is source of truth)."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 401
return jsonify({"board_state": []})
@board_bp.route('/api/board/clear', methods=['POST'])
def clear_board():
"""Clear the board completely."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 401
username = session['username']
result = _board_engine.clear_board(username)
return jsonify(result)
@board_bp.route('/api/board/chat/clear', methods=['POST'])
def clear_board_chat():
"""Clear board conversation history."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 401
username = session['username']
result = _board_engine.clear_chat_history(username)
return jsonify(result)
@board_bp.route('/api/board/sync', methods=['POST'])
def sync_board_state():
"""Sync board state from frontend."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 401
return jsonify({"success": True})
@board_bp.route('/api/board/set-subject', methods=['POST'])
def set_board_subject():
"""Set/switch the board subject."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 401
data = request.get_json()
if not data:
return jsonify({"error": "No JSON data"}), 400
username = session['username']
subject_id = data.get('subject_id', '').strip()
if not subject_id:
return jsonify({"error": "ูŠุฑุฌู‰ ุชุญุฏูŠุฏ ุงู„ู…ุงุฏุฉ"}), 400
has_access, access_error = validate_user_subject_access(username, subject_id)
if not has_access:
return jsonify({"error": access_error, "code": "no_access"}), 403
if not is_board_enabled(subject_id):
return jsonify({"error": "ู‡ุฐู‡ ุงู„ู…ุงุฏุฉ ู„ุง ุชุฏุนู… ุงู„ุณุจูˆุฑุฉ ุงู„ุชูุงุนู„ูŠุฉ"}), 400
subject_data = subject_loader.load(subject_id)
if not subject_data:
return jsonify({"error": f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{subject_id}' ุบูŠุฑ ู…ูˆุฌูˆุฏ"}), 404
_board_engine.set_subject(username, subject_id)
db = get_db()
user = db.read_key('users', username)
student_type = user.get('student_type', 'ุนู„ู…ูŠ') if user else 'ุนู„ู…ูŠ'
subject_name = get_subject_name(student_type, subject_id)
return jsonify({
"success": True,
"subject_id": subject_id,
"subject_name": subject_name,
"message": f"ุชู… ุชุญุฏูŠุฏ ู…ุงุฏุฉ '{subject_name}' ู„ู„ุณุจูˆุฑุฉ"
})
================================================================================
FILE 10: board\tts.py
FULL PATH: main_project\board\tts.py
================================================================================
"""
Text-to-Speech engine.
Updated to use HTTP connection pool.
"""
import os
import uuid
from pathlib import Path
from config import TTS_URL, AUDIO_OUTPUT_DIR, MAX_AUDIO_FILES, TTS_TIMEOUT
from http_pool import tts_session
class TTSEngine:
def __init__(self):
self.synthesize_endpoint = f"{TTS_URL}/synthesize"
self.output_dir = AUDIO_OUTPUT_DIR
Path(self.output_dir).mkdir(parents=True, exist_ok=True)
print(" โœ“ TTSEngine initialized (pooled connections)")
def convert(self, text, voice_id=None):
if not text or not text.strip():
return None
clean_text = text.strip()
if len(clean_text) < 2:
return None
file_id = uuid.uuid4().hex[:12]
filename = f"audio/voice_{file_id}.mp3"
filepath = os.path.join("static", filename)
payload = {"text": clean_text}
if voice_id:
payload["voice_id"] = voice_id
headers = {"Content-Type": "application/json"}
try:
response = tts_session.post(
self.synthesize_endpoint,
headers=headers,
json=payload,
timeout=TTS_TIMEOUT
)
if response.status_code == 200 and len(response.content) > 500:
with open(filepath, 'wb') as f:
f.write(response.content)
file_size = os.path.getsize(filepath)
print(f" ๐Ÿ“Š TTS saved: {filename} ({file_size} bytes)")
return filename
else:
print(
f" โŒ TTS error: status={response.status_code}, "
f"size={len(response.content)}"
)
return None
except Exception as e:
print(f" โŒ TTS error: {e}")
return None
def cleanup_old_files(self):
audio_dir = Path(self.output_dir)
files = sorted(
audio_dir.glob("voice_*.mp3"),
key=lambda f: f.stat().st_mtime
)
if len(files) > MAX_AUDIO_FILES:
for f in files[:len(files) - MAX_AUDIO_FILES]:
try:
f.unlink()
print(f" ๐Ÿ—‘๏ธ Cleaned: {f.name}")
except Exception:
pass
================================================================================
FILE 11: chat\__init__.py
FULL PATH: main_project\chat\__init__.py
================================================================================
"""
Chat package.
Handles text chat with AI (non-board mode).
"""
================================================================================
FILE 12: chat\agent.py
FULL PATH: main_project\chat\agent.py
================================================================================
"""
AI Agent for text chat mode.
Updated to use HTTP connection pools for 200 concurrent users.
"""
import json
import threading
from datetime import datetime, timezone
from config import (
GPT_URL, MISTRAL_COOKIE, TRANSCRIPT_URL,
CLOUDINARY_URL, CLOUDINARY_PRESET,
GPT_TIMEOUT, MISTRAL_TIMEOUT, TRANSCRIPT_TIMEOUT
)
from http_pool import gpt_session, mistral_session, transcript_session, cloudinary_session
from chat.history import ChatHistory
from subjects.loader import subject_loader
from database.chat_history import save_user_chat_to_db, load_user_chat_from_db
class AIAgent:
def __init__(self):
self.gpt_url = GPT_URL
self.mistral_cookie = MISTRAL_COOKIE
self.transcript_base_url = TRANSCRIPT_URL
self.cloudinary_url = CLOUDINARY_URL
self.cloudinary_preset = CLOUDINARY_PRESET
self.chat_history = ChatHistory()
# Exam sessions (in-memory)
self.exam_sessions = {}
self.exam_lock = threading.Lock()
print("โœ… AIAgent initialized (pooled connections)")
# โ”€โ”€โ”€ GPT call (pooled) โ”€โ”€โ”€
def _call_gpt5(self, user_message, system_prompt, temperature=0.7):
payload = {
"user_input": user_message,
"chat_history": [{"role": "system", "content": system_prompt}],
"temperature": temperature,
"top_p": 0.95,
"max_completion_tokens": 4000
}
try:
response = gpt_session.post(self.gpt_url, json=payload, timeout=GPT_TIMEOUT)
response.raise_for_status()
return response.json().get("assistant_response", "")
except Exception as e:
print(f"โŒ GPT-5 Error: {e}")
return None
def _call_gpt5_multipart(self, image_url, text_prompt, system_prompt, temperature=0.7):
payload = {
"user_input": None,
"chat_history": [
{"role": "system", "content": system_prompt},
{
"role": "user",
"type": "multipart",
"content": [
{"type": "image", "url": image_url},
{"type": "text", "text": text_prompt}
]
}
],
"temperature": temperature,
"top_p": 0.95,
"max_completion_tokens": 4000
}
try:
response = gpt_session.post(self.gpt_url, json=payload, timeout=GPT_TIMEOUT)
response.raise_for_status()
return response.json().get("assistant_response", "")
except Exception as e:
print(f"โŒ GPT-5 Multipart Error: {e}")
return None
# โ”€โ”€โ”€ Mistral streaming (pooled) โ”€โ”€โ”€
def _call_mistral_stream(self, user_message, file_content, session_id):
chat_context = self.chat_history.get_full_context(session_id)
full_prompt = f"""{file_content}
=== ุณูŠุงู‚ ุงู„ู…ุญุงุฏุซุฉ ===
{chat_context if chat_context else "ู‡ุฐู‡ ุจุฏุงูŠุฉ ุงู„ู…ุญุงุฏุซุฉ ู…ุน ุงู„ุทุงู„ุจ."}
=== ู†ู‡ุงูŠุฉ ุงู„ุณูŠุงู‚ ===
ุณุคุงู„ ุงู„ุทุงู„ุจ ุงู„ุญุงู„ูŠ: {user_message}
ุฅุฌุงุจุชูƒ:"""
headers = {
"Content-Type": "application/json",
"Cookie": self.mistral_cookie,
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
payload = {
"inputs": [{
"object": "entry",
"type": "message.input",
"created_at": datetime.now(timezone.utc).isoformat(),
"role": "user",
"content": full_prompt,
"prefix": False
}],
"stream": True,
"instructions": "",
"tools": [],
"completion_args": {
"temperature": 0.7,
"top_p": 0.95,
"max_tokens": 4096
},
"model": "mistral-medium-latest"
}
try:
response = mistral_session.post(
"https://console.mistral.ai/api-ui/bora/v1/conversations",
headers=headers,
json=payload,
stream=True,
timeout=MISTRAL_TIMEOUT
)
if response.status_code not in [200, 201]:
error_msg = f"Mistral Error: {response.status_code} - {response.text[:500]}"
yield json.dumps({"error": error_msg}) + "\n"
return
full_response = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
try:
data = json.loads(line[6:])
if data.get('type') == 'message.output.delta':
content = data.get('content', '')
if content:
full_response += content
yield json.dumps({"chunk": content}) + "\n"
except json.JSONDecodeError:
continue
if full_response:
self.chat_history.add_message(session_id, "assistant", full_response)
yield json.dumps({"done": True, "full_response": full_response}) + "\n"
except Exception as e:
yield json.dumps({"error": str(e)}) + "\n"
# โ”€โ”€โ”€ Routing โ”€โ”€โ”€
def route_message(self, user_message, session_id):
subject_id = self.chat_history.get_subject(session_id)
subject_data = subject_loader.load(subject_id) if subject_id else None
if not subject_data:
return "main.txt"
structure_content = subject_data.get("structure.txt", "")
chat_context = self.chat_history.get_full_context(session_id)
last_file = self.chat_history.get_last_file(session_id)
p_files = subject_data.get("_p_files", [])
p_files_list = "\n".join([f"- {f}" for f in p_files])
routing_prompt = f"""{structure_content}
**ุงู„ู…ู„ูุงุช ุงู„ู…ุชุงุญุฉ:**
- main.txt: ู„ู„ุชุญูŠุงุช ูˆุงู„ุฃุณุฆู„ุฉ ุงู„ุนุงู…ุฉ
{p_files_list}
**ุณูŠุงู‚ ุงู„ู…ุญุงุฏุซุฉ:**
{chat_context if chat_context else "ู„ุง ูŠูˆุฌุฏ ุณูŠุงู‚ ุณุงุจู‚"}
**ุงู„ู…ู„ู ุงู„ุฃุฎูŠุฑ ุงู„ู…ุณุชุฎุฏู…:** {last_file if last_file else "ู„ุง ูŠูˆุฌุฏ"}
**ุชุนู„ูŠู…ุงุช:**
- ุฅุฐุง ูƒุงู†ุช ุงู„ุฑุณุงู„ุฉ ู…ุชุงุจุนุฉ ู„ู…ูˆุถูˆุน ุณุงุจู‚ โ†’ ุงุณุชุฎุฏู… ู†ูุณ ุงู„ู…ู„ู ุงู„ุฃุฎูŠุฑ: {last_file if last_file else "main.txt"}
- ุฅุฐุง ู‚ุงู„ ุงู„ุทุงู„ุจ "ุงุณุชู…ุฑ" ุฃูˆ "ุฃูƒู…ู„" โ†’ ุงุณุชุฎุฏู… ู†ูุณ ุงู„ู…ู„ู ุงู„ุฃุฎูŠุฑ
- ุฃุฌุจ **ูู‚ุท** ุจุงุณู… ุงู„ู…ู„ู (ู…ุซุงู„: p1.txt ุฃูˆ main.txt)
ุฑุณุงู„ุฉ ุงู„ุทุงู„ุจ: {user_message}
ุงู„ู…ู„ู ุงู„ู…ู†ุงุณุจ:"""
chosen_file = self._call_gpt5(user_message, routing_prompt, temperature=0.2)
if not chosen_file:
return last_file if last_file else "main.txt"
chosen_file = chosen_file.strip().lower()
valid_files = ["main.txt"] + p_files
for vf in valid_files:
if vf in chosen_file:
return vf
return last_file if last_file else "main.txt"
# โ”€โ”€โ”€ GPT response โ”€โ”€โ”€
def respond_gpt(self, user_message, chosen_file, session_id):
subject_id = self.chat_history.get_subject(session_id)
subject_data = subject_loader.load(subject_id) if subject_id else None
if not subject_data:
return "ุนุฐุฑุงู‹ุŒ ู„ู… ูŠุชู… ุชุญุฏูŠุฏ ุงู„ู…ุงุฏุฉ. ูŠุฑุฌู‰ ุงุฎุชูŠุงุฑ ุงู„ู…ุงุฏุฉ ุฃูˆู„ุงู‹."
main_content = subject_data.get("main.txt", "")
chat_context = self.chat_history.get_full_context(session_id)
system_prompt = f"""{main_content}
=== ุณูŠุงู‚ ุงู„ู…ุญุงุฏุซุฉ ุงู„ุณุงุจู‚ุฉ ===
{chat_context if chat_context else "ู‡ุฐู‡ ุจุฏุงูŠุฉ ุงู„ู…ุญุงุฏุซุฉ."}
=== ู†ู‡ุงูŠุฉ ุงู„ุณูŠุงู‚ ==="""
self.chat_history.add_message(session_id, "user", user_message)
response = self._call_gpt5(user_message, system_prompt, temperature=0.8)
if response:
self.chat_history.add_message(session_id, "assistant", response)
return response
else:
err = "ุนุฐุฑุงู‹ุŒ ุญุฏุซ ุฎุทุฃ ููŠ ุงู„ู†ุธุงู…. ุญุงูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰."
self.chat_history.add_message(session_id, "assistant", err)
return err
# โ”€โ”€โ”€ Mistral response โ”€โ”€โ”€
def respond_mistral_stream(self, user_message, chosen_file, session_id):
subject_id = self.chat_history.get_subject(session_id)
subject_data = subject_loader.load(subject_id) if subject_id else None
if not subject_data:
def error_gen():
yield json.dumps({"error": "ุนุฐุฑุงู‹ุŒ ู„ู… ูŠุชู… ุชุญุฏูŠุฏ ุงู„ู…ุงุฏุฉ."}) + "\n"
return error_gen()
file_content = subject_data.get(chosen_file, "")
self.chat_history.add_message(session_id, "user", user_message)
return self._call_mistral_stream(user_message, file_content, session_id)
# โ”€โ”€โ”€ Process message (route + decide) โ”€โ”€โ”€
def process_message(self, user_message, session_id):
subject = self.chat_history.get_subject(session_id)
if not subject:
return None, "no_subject"
subject_data = subject_loader.load(subject)
if not subject_data:
return None, "subject_not_found"
chosen_file = self.route_message(user_message, session_id)
self.chat_history.set_last_file(session_id, chosen_file)
print(f"๐Ÿ“€ Subject: {subject} | Routed to: {chosen_file} | Session: {session_id}")
return chosen_file, "ok"
# โ”€โ”€โ”€ Subject session management โ”€โ”€โ”€
def select_subject(self, username, session_id, subject_id):
"""Select a subject and load/restore chat history."""
current_subject = self.chat_history.get_subject(session_id)
if current_subject and current_subject != subject_id:
raw = self.chat_history.get_raw_session(session_id)
save_user_chat_to_db(username, current_subject, raw)
self.chat_history.clear_session(session_id)
self.chat_history.set_subject(session_id, subject_id)
saved = load_user_chat_from_db(username, subject_id)
if saved:
self.chat_history.restore_session(session_id, saved)
raw = self.chat_history.get_raw_session(session_id)
save_user_chat_to_db(username, subject_id, raw)
def save_current_chat(self, username, session_id):
"""Save current session to DB."""
subject_id = self.chat_history.get_subject(session_id)
if subject_id:
raw = self.chat_history.get_raw_session(session_id)
save_user_chat_to_db(username, subject_id, raw)
# โ”€โ”€โ”€ Audio (pooled) โ”€โ”€โ”€
def upload_audio(self, audio_file_path, original_filename):
try:
with open(audio_file_path, 'rb') as f:
files = {'audio': (original_filename, f, 'audio/webm')}
response = transcript_session.post(
f"{self.transcript_base_url}/upload",
files=files,
timeout=TRANSCRIPT_TIMEOUT
)
response.raise_for_status()
data = response.json()
if 'file_url' in data:
return data['file_url'], None
else:
return None, data.get('error', 'Upload failed')
except Exception as e:
print(f"โŒ Audio upload error: {e}")
return None, str(e)
def transcribe_audio(self, file_url):
try:
payload = {
"file_url": file_url,
"prompt": "ู‡ุฐุง ุชุณุฌูŠู„ ุตูˆุชูŠ ู„ุทุงู„ุจ ูŠุณุฃู„ ุณุคุงู„ุงู‹ ุฏุฑุงุณูŠุงู‹"
}
response = transcript_session.post(
f"{self.transcript_base_url}/transcribe",
json=payload,
timeout=TRANSCRIPT_TIMEOUT
)
response.raise_for_status()
data = response.json()
if 'transcription' in data:
return data['transcription'], None
else:
return None, data.get('error', 'Transcription failed')
except Exception as e:
print(f"โŒ Transcription error: {e}")
return None, str(e)
# โ”€โ”€โ”€ Image (pooled) โ”€โ”€โ”€
def upload_image_to_cloudinary(self, image_path):
try:
with open(image_path, 'rb') as image_file:
files = {'file': image_file}
data = {'upload_preset': self.cloudinary_preset}
response = cloudinary_session.post(
self.cloudinary_url,
files=files,
data=data,
timeout=60
)
if response.status_code == 200:
result = response.json()
image_url = result.get('url') or result.get('secure_url')
return image_url, None
else:
return None, f"Cloudinary error: {response.status_code}"
except Exception as e:
return None, str(e)
def analyze_image(self, image_url, session_id):
system_prompt = """ุฃู†ุช ู…ุณุงุนุฏ ุชุนู„ูŠู…ูŠ ุฐูƒูŠ ู…ุชุฎุตุต ููŠ ุชุญู„ูŠู„ ุงู„ุตูˆุฑ ุงู„ุฏุฑุงุณูŠุฉ.
ู…ู‡ู…ุชูƒ:
1. ุฅุฐุง ูƒุงู†ุช ุงู„ุตูˆุฑุฉ ุชุญุชูˆูŠ ุนู„ู‰ ู…ุญุชูˆู‰ ุฏุฑุงุณูŠ (ู…ุนุงุฏู„ุงุชุŒ ู†ุตูˆุตุŒ ู…ุณุงุฆู„ุŒ ุฑุณูˆู… ุจูŠุงู†ูŠุฉุŒ ุฌุฏุงูˆู„ุŒ ุดุฑูˆุญุงุชุŒ ูƒุชุจุŒ ุฃูˆุฑุงู‚ ุงู…ุชุญุงู†ุงุช):
- ุงุณุชุฎุฑุฌ ูƒู„ ุงู„ู…ุญุชูˆู‰ ู…ู† ุงู„ุตูˆุฑุฉ ุจุฏู‚ุฉ ุชุงู…ุฉ
- ุงูƒุชุจ ุงู„ู…ุนุงุฏู„ุงุช ุจุตูŠุบุฉ LaTeX
- ุงูƒุชุจ ุงู„ู†ุตูˆุต ูƒู…ุง ู‡ูŠ
- ุงุดุฑุญ ุงู„ุฑุณูˆู… ุงู„ุจูŠุงู†ูŠุฉ ุฅู† ูˆุฌุฏุช
- ู‚ุฏู… ุงู„ู…ุญุชูˆู‰ ุจุดูƒู„ ู…ู†ุธู… ูˆู…ููŠุฏ
2. ุฅุฐุง ูƒุงู†ุช ุงู„ุตูˆุฑุฉ ู„ุง ุชุญุชูˆูŠ ุนู„ู‰ ู…ุญุชูˆู‰ ุฏุฑุงุณูŠ:
- ุฃุฌุจ ูู‚ุท ุจุงู„ูƒู„ู…ุฉ: <unsupported>
ู„ุง ุชุถู ุฃูŠ ุชุนู„ูŠู‚ ุฅุถุงููŠ."""
result = self._call_gpt5_multipart(
image_url=image_url,
text_prompt="ุญู„ู„ ู‡ุฐู‡ ุงู„ุตูˆุฑุฉ ูˆุงุณุชุฎุฑุฌ ู…ุญุชูˆุงู‡ุง ุงู„ุฏุฑุงุณูŠ",
system_prompt=system_prompt,
temperature=0.3
)
return result
# โ”€โ”€โ”€ Exam session management โ”€โ”€โ”€
def create_exam_session(self, username, subject_id, door_file, question_count, difficulty):
with self.exam_lock:
self.exam_sessions[username] = {
"subject_id": subject_id,
"door_file": door_file,
"question_count": question_count,
"difficulty": difficulty,
"questions": [],
"current_index": 0,
"score": 0,
"answers": [],
"started_at": datetime.now().isoformat(),
"status": "generating",
}
return self.exam_sessions[username]
def get_exam_session(self, username):
with self.exam_lock:
return self.exam_sessions.get(username, None)
def clear_exam_session(self, username):
with self.exam_lock:
if username in self.exam_sessions:
del self.exam_sessions[username]
print(f"๐Ÿ—‘๏ธ Cleared exam session for {username}")
================================================================================
FILE 13: chat\history.py
FULL PATH: main_project\chat\history.py
================================================================================
"""
In-memory chat history manager with auto-summarization.
Per-session, thread-safe. Updated to use pooled connections.
"""
import threading
from datetime import datetime, timezone
from config import GPT_URL, SUMMARY_TRIGGER_COUNT, GPT_TIMEOUT
from http_pool import gpt_session
class ChatHistory:
def __init__(self):
self.gpt_url = GPT_URL
self.sessions = {}
self.lock = threading.Lock()
print(" โœ“ ChatHistory initialized (pooled connections)")
def get_or_create_session(self, session_id):
with self.lock:
if session_id not in self.sessions:
self.sessions[session_id] = {
"messages": [],
"full_messages": [],
"summary": "",
"message_count": 0,
"last_file": None,
"subject": None,
}
return self.sessions[session_id]
def set_subject(self, session_id, subject):
s = self.get_or_create_session(session_id)
with self.lock:
s["subject"] = subject
def get_subject(self, session_id):
s = self.get_or_create_session(session_id)
return s.get("subject", None)
def add_message(self, session_id, role, content):
s = self.get_or_create_session(session_id)
msg = {
"role": role,
"content": content,
"timestamp": datetime.now(timezone.utc).isoformat()
}
with self.lock:
s["messages"].append(msg)
s["full_messages"].append(msg)
s["message_count"] += 1
if len(s["messages"]) >= SUMMARY_TRIGGER_COUNT:
# Run summarization in background thread to not block
threading.Thread(
target=self._trigger_summary,
args=(session_id,),
daemon=True
).start()
def _trigger_summary(self, session_id):
s = self.sessions.get(session_id)
if not s:
return
with self.lock:
msgs = s["messages"].copy()
s["messages"] = []
if not msgs:
return
conv_text = ""
for m in msgs:
if m["role"] == "user":
conv_text += f"ุงู„ุทุงู„ุจ: {m['content']}\n\n"
elif m["role"] == "assistant":
conv_text += f"ุงู„ู…ุฏุฑุณ: {m['content']}\n\n"
prev_summary = s.get("summary", "")
summary_prompt = f"""ุฃู†ุช ู†ุธุงู… ุชู„ุฎูŠุต ุฐูƒูŠ ู„ู…ุญุงุฏุซุงุช ุชุนู„ูŠู…ูŠุฉ.
ู…ู‡ู…ุชูƒ: ุชู„ุฎูŠุต ุงู„ู…ุญุงุฏุซุฉ ุงู„ุชุงู„ูŠุฉ ุจุดูƒู„ ุฏู‚ูŠู‚ ูˆู…ููŠุฏ.
**ุงู„ุชุนู„ูŠู…ุงุช:**
1. ู„ุฎู‘ุต ุงู„ู†ู‚ุงุท ุงู„ุฑุฆูŠุณูŠุฉ ุงู„ุชูŠ ู†ุงู‚ุดู‡ุง ุงู„ุทุงู„ุจ ูˆุงู„ู…ุฏุฑุณ
2. ุญุฏุฏ ุงู„ู…ูˆุงุถูŠุน ุงู„ุชูŠ ุณุฃู„ ุนู†ู‡ุง ุงู„ุทุงู„ุจ
3. ู‚ูŠู‘ู… ู…ุณุชูˆู‰ ูู‡ู… ุงู„ุทุงู„ุจ (ู…ุจุชุฏุฆ / ู…ุชูˆุณุท / ู…ุชู‚ุฏู…)
4. ุงุฐูƒุฑ ุฃูŠ ู†ู‚ุงุท ุถุนู ุฃูˆ ู‚ูˆุฉ ู„ุงุญุธุชู‡ุง
5. ุงุฐูƒุฑ ุขุฎุฑ ู…ูˆุถูˆุน ูƒุงู† ูŠูู†ุงู‚ุด
6. ุงุฌุนู„ ุงู„ุชู„ุฎูŠุต ู…ุฎุชุตุฑุงู‹ ู„ูƒู† ุดุงู…ู„ุงู‹
{"**ุงู„ุชู„ุฎูŠุต ุงู„ุณุงุจู‚:**" + chr(10) + prev_summary + chr(10) if prev_summary else ""}
**ุงู„ู…ุญุงุฏุซุฉ ุงู„ุฌุฏูŠุฏุฉ:**
{conv_text}
**ุงู„ุชู„ุฎูŠุต ุงู„ุดุงู…ู„:**"""
try:
payload = {
"user_input": "ู‚ู… ุจุชู„ุฎูŠุต ุงู„ู…ุญุงุฏุซุฉ",
"chat_history": [{"role": "system", "content": summary_prompt}],
"temperature": 0.3,
"top_p": 0.95,
"max_completion_tokens": 1500
}
resp = gpt_session.post(self.gpt_url, json=payload, timeout=GPT_TIMEOUT)
resp.raise_for_status()
new_summary = resp.json().get("assistant_response", "")
if new_summary:
with self.lock:
s["summary"] = new_summary
print(f" โœ… Summary generated for session {session_id[:20]}...")
else:
with self.lock:
s["messages"] = msgs + s["messages"]
except Exception as e:
print(f"โŒ Summary error: {e}")
with self.lock:
s["messages"] = msgs + s["messages"]
def get_full_context(self, session_id):
s = self.get_or_create_session(session_id)
parts = []
if s["summary"]:
parts.append(f"**ู…ู„ุฎุต ุงู„ู…ุญุงุฏุซุฉ ุงู„ุณุงุจู‚ุฉ:**\n{s['summary']}")
if s["messages"]:
recent = ""
for m in s["messages"]:
if m["role"] == "user":
recent += f"ุงู„ุทุงู„ุจ: {m['content']}\n\n"
elif m["role"] == "assistant":
recent += f"ุงู„ู…ุฏุฑุณ: {m['content']}\n\n"
if recent:
parts.append(f"**ุงู„ู…ุญุงุฏุซุฉ ุงู„ุญุงู„ูŠุฉ:**\n{recent}")
return "\n\n".join(parts)
def get_messages_for_display(self, session_id):
s = self.get_or_create_session(session_id)
with self.lock:
return s["full_messages"].copy()
def get_summary(self, session_id):
s = self.get_or_create_session(session_id)
return s.get("summary", "")
def set_last_file(self, session_id, filename):
s = self.get_or_create_session(session_id)
with self.lock:
s["last_file"] = filename
def get_last_file(self, session_id):
s = self.get_or_create_session(session_id)
return s.get("last_file", None)
def clear_session(self, session_id):
with self.lock:
if session_id in self.sessions:
current_subject = self.sessions[session_id].get("subject", None)
self.sessions[session_id] = {
"messages": [],
"full_messages": [],
"summary": "",
"message_count": 0,
"last_file": None,
"subject": current_subject,
}
def restore_session(self, session_id, saved_data):
"""Restore a session from saved DB data."""
s = self.get_or_create_session(session_id)
with self.lock:
s["messages"] = saved_data.get("messages", [])
s["full_messages"] = saved_data.get("full_messages", saved_data.get("messages", []))
s["summary"] = saved_data.get("summary", "")
s["message_count"] = saved_data.get("message_count", 0)
s["last_file"] = saved_data.get("last_file", None)
s["subject"] = saved_data.get("subject", s.get("subject"))
def get_session_info(self, session_id):
s = self.get_or_create_session(session_id)
return {
"total_messages": s["message_count"],
"current_messages": len(s["messages"]),
"has_summary": bool(s["summary"]),
"last_file": s["last_file"],
"subject": s["subject"],
"summary_preview": (
s["summary"][:200] + "..."
if len(s.get("summary", "")) > 200
else s.get("summary", "")
)
}
def get_raw_session(self, session_id):
"""Get raw session dict for saving to DB."""
return self.get_or_create_session(session_id)
================================================================================
FILE 14: chat\routes.py
FULL PATH: main_project\chat\routes.py
================================================================================
```python
"""
Chat API routes for text-based AI chat.
Updated to use MemoryDB.
"""
import json
import os
import tempfile
from datetime import datetime
from flask import Blueprint, request, jsonify, Response, stream_with_context, session, render_template, redirect, url_for, flash
import requests as http_requests
from auth.helpers import is_session_valid
from memory_db import get_db
from database.chat_history import (
save_user_chat_to_db, clear_user_subject_chat_from_db,
get_all_user_chats_from_db
)
from subjects.access import validate_user_subject_access, get_user_accessible_subjects
from subjects.definitions import SUBJECTS, get_subject_name
from subjects.loader import subject_loader
chat_bp = Blueprint('chat', __name__)
_agent = None
def init_chat_agent(agent):
global _agent
_agent = agent
@chat_bp.route('/chat_page')
def chat_page():
if not is_session_valid():
session.clear()
flash('ุงู†ุชู‡ุช ุงู„ุฌู„ุณุฉุŒ ูŠุฑุฌู‰ ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰.', 'error')
return redirect(url_for('auth.login'))
username = session['username']
db = get_db()
user_data = db.read_key('users', username)
if not user_data or not user_data.get('verified', False):
flash('ุญุณุงุจูƒ ุบูŠุฑ ู…ูุนู‘ู„. ุชูˆุงุตู„ ู…ุน ุงู„ุฅุฏุงุฑุฉ.', 'error')
return redirect(url_for('dashboard'))
return render_template('chat.html', username=username)
@chat_bp.route('/me', methods=['GET'])
def get_me():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
db = get_db()
user = db.read_key('users', username)
if not user:
return jsonify({"error": "ุงู„ู…ุณุชุฎุฏู… ุบูŠุฑ ู…ูˆุฌูˆุฏ", "code": "user_not_found"}), 404
if not user.get('verified', False):
return jsonify({"error": "ุงู„ุญุณุงุจ ุบูŠุฑ ู…ูุนู‘ู„", "code": "not_verified"}), 403
subjects, error = get_user_accessible_subjects(username)
if error:
return jsonify({"error": error}), 400
return jsonify({
"username": username,
"student_type": user.get('student_type', ''),
"balance": user.get('balance', 0),
"session_id": user.get('session_id', username),
"subjects": subjects
})
@chat_bp.route('/subjects', methods=['GET'])
def get_subjects():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
subjects, error = get_user_accessible_subjects(username)
if error:
return jsonify({"error": error}), 400
return jsonify({
"subjects": subjects,
"count": len(subjects),
"accessible_count": len([s for s in subjects if s['accessible']])
})
@chat_bp.route('/select-subject', methods=['POST'])
def select_subject():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
data = request.json
username = session['username']
subject_id = data.get('subject', '').strip()
if not subject_id:
return jsonify({"error": "ูŠุฑุฌู‰ ุชุญุฏูŠุฏ ุงู„ู…ุงุฏุฉ", "code": "no_subject"}), 400
has_access, access_error = validate_user_subject_access(username, subject_id)
if not has_access:
return jsonify({"error": access_error, "code": "no_access"}), 403
subject_data = subject_loader.load(subject_id)
if not subject_data:
return jsonify({"error": f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{subject_id}' ุบูŠุฑ ู…ูˆุฌูˆุฏ ุนู„ู‰ ุงู„ุฎุงุฏู…"}), 404
session_id = username
_agent.select_subject(username, session_id, subject_id)
db = get_db()
user = db.read_key('users', username)
student_type = user.get('student_type', '') if user else ''
subject_name = get_subject_name(student_type, subject_id)
p_files = subject_data.get("_p_files", [])
s = _agent.chat_history.get_or_create_session(session_id)
full_messages = s.get("full_messages", [])
return jsonify({
"success": True,
"subject_id": subject_id,
"subject_name": subject_name,
"chapters_count": len(p_files),
"files": ["main.txt"] + p_files,
"message": f"ุชู… ุชุญู…ูŠู„ ู…ุงุฏุฉ '{subject_name}' ุจู†ุฌุงุญ! ({len(p_files)} ูุตูˆู„)",
"chat_history": full_messages,
"history_count": len(full_messages)
})
@chat_bp.route('/switch-subject', methods=['POST'])
def switch_subject():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
data = request.json
username = session['username']
new_subject_id = data.get('subject', '').strip()
if not new_subject_id:
return jsonify({"error": "ูŠุฑุฌู‰ ุชุญุฏูŠุฏ ุงุณู… ุงู„ู…ุงุฏุฉ ุงู„ุฌุฏูŠุฏุฉ"}), 400
has_access, access_error = validate_user_subject_access(username, new_subject_id)
if not has_access:
return jsonify({"error": access_error, "code": "no_access"}), 403
subject_data = subject_loader.load(new_subject_id)
if not subject_data:
return jsonify({"error": f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{new_subject_id}' ุบูŠุฑ ู…ูˆุฌูˆุฏ ุนู„ู‰ ุงู„ุฎุงุฏู…"}), 404
session_id = username
_agent.select_subject(username, session_id, new_subject_id)
db = get_db()
user = db.read_key('users', username)
student_type = user.get('student_type', '') if user else ''
subject_name = get_subject_name(student_type, new_subject_id)
p_files = subject_data.get("_p_files", [])
s = _agent.chat_history.get_or_create_session(session_id)
full_messages = s.get("full_messages", [])
return jsonify({
"success": True,
"subject_id": new_subject_id,
"subject_name": subject_name,
"chapters_count": len(p_files),
"files": ["main.txt"] + p_files,
"message": f"ุชู… ุงู„ุชุจุฏูŠู„ ุฅู„ู‰ ู…ุงุฏุฉ '{subject_name}' ุจู†ุฌุงุญ!",
"chat_history": full_messages,
"history_count": len(full_messages)
})
@chat_bp.route('/chat', methods=['POST'])
def chat():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
data = request.json
user_message = data.get('message', '').strip()
username = session['username']
if not user_message:
return jsonify({"error": "ุฑุณุงู„ุฉ ูุงุฑุบุฉ"}), 400
session_id = username
db = get_db()
user = db.read_key('users', username)
if not user:
return jsonify({"error": "ุงู„ู…ุณุชุฎุฏู… ุบูŠุฑ ู…ูˆุฌูˆุฏ", "code": "user_not_found"}), 404
if not user.get('verified', False):
return jsonify({"error": "ุงู„ุญุณุงุจ ุบูŠุฑ ู…ูุนู‘ู„", "code": "not_verified"}), 403
current_subject = _agent.chat_history.get_subject(session_id)
if not current_subject:
return jsonify({"error": "ู„ู… ูŠุชู… ุงุฎุชูŠุงุฑ ุงู„ู…ุงุฏุฉ ุจุนุฏ", "need_subject": True}), 400
has_access, access_error = validate_user_subject_access(username, current_subject)
if not has_access:
return jsonify({"error": access_error, "code": "no_access"}), 403
try:
chosen_file, status = _agent.process_message(user_message, session_id)
if status == "no_subject":
return jsonify({"error": "ูŠุฑุฌู‰ ุงุฎุชูŠุงุฑ ุงู„ู…ุงุฏุฉ ุฃูˆู„ุงู‹", "need_subject": True}), 400
if status == "subject_not_found":
return jsonify({"error": f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{current_subject}' ุบูŠุฑ ู…ูˆุฌูˆุฏ"}), 404
if chosen_file == "main.txt":
response = _agent.respond_gpt(user_message, chosen_file, session_id)
_agent.save_current_chat(username, session_id)
session_info = _agent.chat_history.get_session_info(session_id)
return jsonify({
"type": "gpt",
"subject": current_subject,
"chosen_file": chosen_file,
"response": response,
"session_info": session_info
})
else:
session_info = _agent.chat_history.get_session_info(session_id)
return jsonify({
"type": "mistral",
"subject": current_subject,
"chosen_file": chosen_file,
"stream_url": (
f"/stream?file={chosen_file}"
f"&session={http_requests.utils.quote(session_id)}"
f"&message={http_requests.utils.quote(user_message)}"
),
"session_info": session_info
})
except Exception as e:
print(f"โŒ Chat error: {e}")
return jsonify({"error": str(e)}), 500
@chat_bp.route('/stream')
def stream():
if not is_session_valid():
def auth_error():
yield f"data: {json.dumps({'error': 'ุบูŠุฑ ู…ุตุฑุญ'})}\n\n"
return Response(auth_error(), mimetype='text/event-stream')
username = session['username']
user_message = request.args.get('message', '')
chosen_file = request.args.get('file', 'main.txt')
session_id = request.args.get('session', username)
db = get_db()
user = db.read_key('users', username)
if not user or not user.get('verified', False):
def auth_error():
yield f"data: {json.dumps({'error': 'ุบูŠุฑ ู…ุตุฑุญ'})}\n\n"
return Response(auth_error(), mimetype='text/event-stream')
current_subject = _agent.chat_history.get_subject(session_id)
if current_subject:
has_access, _ = validate_user_subject_access(username, current_subject)
if not has_access:
def access_error():
yield f"data: {json.dumps({'error': 'ู„ูŠุณ ู„ุฏูŠูƒ ุตู„ุงุญูŠุฉ ุงู„ูˆุตูˆู„ ู„ู‡ุฐู‡ ุงู„ู…ุงุฏุฉ'})}\n\n"
return Response(access_error(), mimetype='text/event-stream')
def generate():
try:
for chunk in _agent.respond_mistral_stream(user_message, chosen_file, session_id):
try:
parsed = json.loads(chunk.strip())
if parsed.get("done"):
_agent.save_current_chat(username, session_id)
except Exception:
pass
yield f"data: {chunk}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
'Connection': 'keep-alive'
}
)
@chat_bp.route('/clear', methods=['POST'])
def clear_history():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
session_id = username
current_subject = _agent.chat_history.get_subject(session_id)
_agent.chat_history.clear_session(session_id)
if current_subject:
clear_user_subject_chat_from_db(username, current_subject)
return jsonify({
"success": True,
"message": "ุชู… ู…ุณุญ ุณุฌู„ ุงู„ู…ุญุงุฏุซุฉ ู„ู„ู…ุงุฏุฉ ุงู„ุญุงู„ูŠุฉ",
"subject_kept": current_subject
})
@chat_bp.route('/session-info', methods=['POST'])
def session_info():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
session_id = username
info = _agent.chat_history.get_session_info(session_id)
return jsonify(info)
@chat_bp.route('/summary', methods=['POST'])
def get_summary():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
session_id = username
summary = _agent.chat_history.get_summary(session_id)
subject = _agent.chat_history.get_subject(session_id)
return jsonify({
"username": username,
"subject": subject,
"summary": summary if summary else "ู„ุง ูŠูˆุฌุฏ ู…ู„ุฎุต ุจุนุฏ. ุณูŠุชู… ุฅู†ุดุงุก ู…ู„ุฎุต ุชู„ู‚ุงุฆูŠุงู‹ ุจุนุฏ 10 ุฑุณุงุฆู„."
})
@chat_bp.route('/reload-subject', methods=['POST'])
def reload_subject():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
data = request.json
username = session['username']
subject_id = data.get('subject', '').strip()
if not subject_id:
return jsonify({"error": "ูŠุฑุฌู‰ ุชุญุฏูŠุฏ ุงุณู… ุงู„ู…ุงุฏุฉ"}), 400
has_access, access_error = validate_user_subject_access(username, subject_id)
if not has_access:
return jsonify({"error": access_error, "code": "no_access"}), 403
subject_data = subject_loader.reload(subject_id)
if not subject_data:
return jsonify({"error": f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{subject_id}' ุบูŠุฑ ู…ูˆุฌูˆุฏ ุนู„ู‰ ุงู„ุฎุงุฏู…"}), 404
p_files = subject_data.get("_p_files", [])
return jsonify({
"success": True,
"subject_id": subject_id,
"chapters_count": len(p_files),
"message": f"ุชู… ุฅุนุงุฏุฉ ุชุญู…ูŠู„ ุงู„ู…ุงุฏุฉ '{subject_id}' ุจู†ุฌุงุญ!"
})
@chat_bp.route('/get-chat-history', methods=['GET'])
def get_chat_history():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
session_id = username
s = _agent.chat_history.get_or_create_session(session_id)
full_messages = s.get("full_messages", s.get("messages", []))
subject = s.get("subject", None)
subject_name = None
if subject:
db = get_db()
user = db.read_key('users', username)
student_type = user.get('student_type', '') if user else ''
subject_name = get_subject_name(student_type, subject)
return jsonify({
"messages": full_messages,
"subject": subject,
"subject_name": subject_name,
"count": len(full_messages)
})
@chat_bp.route('/get-all-chats', methods=['GET'])
def get_all_chats():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
all_chats = get_all_user_chats_from_db(username)
db = get_db()
user = db.read_key('users', username)
student_type = user.get('student_type', '') if user else ''
result = []
for subject_id, chat_data in all_chats.items():
subject_name = get_subject_name(student_type, subject_id)
full_msgs = chat_data.get("full_messages", [])
result.append({
"subject_id": subject_id,
"subject_name": subject_name,
"message_count": len(full_msgs),
"last_saved": chat_data.get("saved_at", ""),
"has_summary": bool(chat_data.get("summary", ""))
})
return jsonify({
"chats": result,
"total": len(result)
})
================================================================================
FILE 15: config.py
FULL PATH: main_project\config.py
================================================================================
"""
Global configuration and constants.
All settings in one place.
"""
import os
import secrets
# โ”€โ”€โ”€ Flask โ”€โ”€โ”€
SECRET_KEY = os.environ.get("SECRET_KEY", "corvo-ai-fixed-secret-key-change-in-production-2024")
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
HOST = os.environ.get("HOST", "0.0.0.0")
PORT = int(os.environ.get("PORT", 7860))
# โ”€โ”€โ”€ AI endpoints โ”€โ”€โ”€
GPT_URL = "https://corvo-ai-gpt-5-4.hf.space/chat"
TTS_URL = "https://corvo-ai-tts.hf.space"
TRANSCRIPT_URL = "https://corvo-ai-transcript.hf.space"
# โ”€โ”€โ”€ Connection Pool Settings โ”€โ”€โ”€
GPT_POOL_SIZE = 40
TTS_POOL_SIZE = 30
MISTRAL_POOL_SIZE = 30
TRANSCRIPT_POOL_SIZE = 10
CLOUDINARY_POOL_SIZE = 10
GPT_TIMEOUT = 120
TTS_TIMEOUT = 60
MISTRAL_TIMEOUT = 120
TRANSCRIPT_TIMEOUT = 120
# โ”€โ”€โ”€ Cloudinary โ”€โ”€โ”€
CLOUDINARY_URL = "https://api.cloudinary.com/v1_1/dwsoob1wh/image/upload"
CLOUDINARY_PRESET = "Cloud-storage"
# โ”€โ”€โ”€ Icons8 โ”€โ”€โ”€
ICONS8_SEARCH_URL = (
"https://search-app.icons8.com/api/iconsets/v7/search"
"?isAnimated=false&language=en&analytics=true"
"&spellcheck=false&saveAnalytics=true"
"&amount=10&isOuch=true"
"&replaceNameWithSynonyms=true"
"&offset=0&term={term}"
)
ICONS8_IMAGE_URL = "https://img.icons8.com/?size=100&id={icon_id}&format=png"
# โ”€โ”€โ”€ Audio โ”€โ”€โ”€
AUDIO_OUTPUT_DIR = "static/audio"
MAX_AUDIO_FILES = 500
# โ”€โ”€โ”€ Chat history โ”€โ”€โ”€
SUMMARY_TRIGGER_COUNT = 10
MAX_CHAT_HISTORY = 20
# โ”€โ”€โ”€ Directories to create on startup โ”€โ”€โ”€
REQUIRED_DIRS = [
"static/audio",
"templates",
"FINAL",
]
# โ”€โ”€โ”€ Mistral cookie โ”€โ”€โ”€
MISTRAL_COOKIE = os.environ.get("MISTRAL_COOKIE", (
'''__cf_bm=JrqUGrlLa1VNznB_gcRKdDY.AyPLIvyRVUuKMV0avmc-1771531995-1.0.1.1-gJPuKUhLOcJ6hccPtwfCq6jJOAkQ8xABZJi_RwovhSQrsUTbUv1MCh1IPLvZ3ldPzqHLrPmcWUwLE_q_iVrP.W9PCYLnRq4HMMbVeyuDwlE; _cfuvid=oL4ochUsTJrLY6FlPpKTfocKdvOyum8GRuUBbqS9l8U-1771531995497-0.0.1.1-604800000; csrf_token_1d61ec8f0158ec4868343239ec73dbe1bfebad9908ad860e62f470c767573d0d=6ybeYU0W24tff63a5O/h0wbG6pYuc4w4iKv6SOKnYFc=; ory_session_coolcurranf83m3srkfl=MTc3MTUzMjA2OXxZU1dnRmd5SzNZM3o5VERSMGN5OFBWaHZ4bmdzd2RLOXNTUHNqMVVQbUdqUVFyZXpFN1ZCa0stYmJOR1lmbFNHVWphdWE4UWZ4S2FESjVROVJVYkw3a0MxN01TVVRZSUFGcWdZRHBCSjA2WEFJcFowcERmTWdrTmxuZ01zLWtubWxtd3lELVBBSVhOSzRkS1RRTWlxUHo1WmJwRWZzNlBXOFZWbUQyY2xkRVlOdXczb19kMlA4enc2ZGdVbkhaOUtXNDhEcEtxTVVBUTFYVk5YUXUtbDhjNVBOaG04WDRncmVNeXV4WWZXNC05TTlZRml3Y2hmR1F3QUR4Wk1aS0wwVUFfWGFyb2YxUlUtZXJkMGVhc3B8ITaivuTKOyk2pkoOhT6SPvP7zk-_NWF074pMm1HUl-k=; intercom-device-id-xel0jpx9=16cbae05-b305-4216-9967-e1622600c85c; csrftoken=KevqOi51yhx99LPY2i96wpMb2BxPlbpA; __cflb=0H28vBt3Asif1pksrBB47e5ijRcsvN4rdm4JmqYW3k8; intercom-session-xel0jpx9=K0lUK3E1QkhQemZuN256UXRtUVIxQWRoTVNQVXA2OVhXbDd1UHlGb3hxVmFVN0tJTG5LdFEvelpJck5aR0FjV3VWY0NjR1piMUxUdkZUc09kLzVDMTIrRGZiU3BtRElIQ0FvZUVZdHZyNU5sZU9EU0E3cHFCK1FzdE9kUTFKaG5PclZJTGRxN1c2WUt1NEVLNXBqZUwySStMQ2JHWHo3Z3Vyc2xLSWh2YkVDbWJBOWJWeHpHNFJIUzltZWUvQXVXLS1jdGVHOFVGRTJNS3I2Qm1tWlk3UEVBPT0=--d3764aa89dd7c5400eb27bf4e34ad8c85402eb72'''
))
================================================================================
FILE 16: database\__init__.py
FULL PATH: main_project\database\__init__.py
================================================================================
"""
Database package.
Handles all JSON file persistence.
"""
================================================================================
FILE 17: database\cards.py
FULL PATH: main_project\database\cards.py
================================================================================
"""
Cards database operations.
Pure in-memory via MemoryDB. No local files.
"""
from memory_db import get_db
def init_cards_db():
db = get_db()
count = db.count('cards')
print(f" โœ… Cards DB ready: {count} cards in memory")
def load_cards_db():
return get_db().read('cards')
def save_cards_db(data):
get_db().write_full('cards', data)
def get_card(code):
return get_db().read_key('cards', code)
def set_card(code, card_data):
get_db().write('cards', code, card_data)
def card_exists(code):
return get_db().has_key('cards', code)
================================================================================
FILE 18: database\chat_history.py
FULL PATH: main_project\database\chat_history.py
================================================================================
"""
Chat history database.
Pure in-memory via MemoryDB. No local files.
"""
from datetime import datetime
from memory_db import get_db
def init_chat_history_db():
db = get_db()
count = db.count('chat_history')
print(f" โœ… Chat History DB ready: {count} records in memory")
def make_chat_db_key(username, subject_id):
return f"{username}__{subject_id}"
def save_user_chat_to_db(username, subject_id, session_data):
try:
db = get_db()
db_key = make_chat_db_key(username, subject_id)
chat_record = {
"username": username,
"subject_id": subject_id,
"messages": session_data.get("messages", []),
"full_messages": session_data.get("full_messages", []),
"summary": session_data.get("summary", ""),
"message_count": session_data.get("message_count", 0),
"last_file": session_data.get("last_file", None),
"subject": session_data.get("subject", subject_id),
"saved_at": datetime.now().isoformat()
}
db.write('chat_history', db_key, chat_record)
msg_count = len(session_data.get('full_messages', []))
print(f"โœ… Saved chat for {username} / subject={subject_id} ({msg_count} messages)")
except Exception as e:
print(f"โŒ Error saving chat to memory: {e}")
def load_user_chat_from_db(username, subject_id):
try:
db = get_db()
db_key = make_chat_db_key(username, subject_id)
saved = db.read_key('chat_history', db_key)
if saved is None:
return None
return saved
except Exception as e:
print(f"โŒ Error loading chat from memory: {e}")
return None
def clear_user_subject_chat_from_db(username, subject_id):
try:
db = get_db()
db_key = make_chat_db_key(username, subject_id)
db.delete('chat_history', db_key)
print(f"๐Ÿ—‘๏ธ Cleared chat for {username} / subject={subject_id}")
except Exception as e:
print(f"โŒ Error clearing chat: {e}")
def get_all_user_chats_from_db(username):
try:
db = get_db()
prefix = f"{username}__"
records = db.find_keys_by_prefix('chat_history', prefix)
result = {}
for key, val in records.items():
subject_id = key[len(prefix):]
result[subject_id] = val
return result
except Exception as e:
print(f"โŒ Error getting all user chats: {e}")
return {}
================================================================================
FILE 19: database\telegram.py
FULL PATH: main_project\database\telegram.py
================================================================================
"""
Telegram verification database.
Pure in-memory via MemoryDB. No local files.
"""
from memory_db import get_db
def load_telegram_db():
return get_db().read('telegram')
def save_telegram_db(data):
get_db().write_full('telegram', data)
def get_telegram_user(user_id):
return get_db().read_key('telegram', str(user_id))
def set_telegram_user(user_id, data):
get_db().write('telegram', str(user_id), data)
================================================================================
FILE 20: database\users.py
FULL PATH: main_project\database\users.py
================================================================================
"""
Users database operations.
Pure in-memory via MemoryDB. No local files.
"""
from memory_db import get_db
def init_users_db():
db = get_db()
count = db.count('users')
print(f" โœ… Users DB ready: {count} users in memory")
def load_users_db():
return get_db().read('users')
def save_users_db(data):
get_db().write_full('users', data)
def get_user(username):
return get_db().read_key('users', username)
def update_user(username, update_fn):
return get_db().update_key('users', username, update_fn)
def set_user(username, user_data):
get_db().write('users', username, user_data)
def user_exists(username):
return get_db().has_key('users', username)
================================================================================
FILE 21: exam\__init__.py
FULL PATH: main_project\exam\__init__.py
================================================================================
"""
Exam package.
MCQ exam generation and scoring.
"""
================================================================================
FILE 22: exam\generator.py
FULL PATH: main_project\exam\generator.py
================================================================================
"""
Exam question generator.
Updated to use HTTP connection pools.
"""
import re
import json
import random
import os
from config import GPT_URL, GPT_TIMEOUT
from http_pool import gpt_session
from subjects.loader import subject_loader
def load_final_mcq(subject_id):
"""Load the MCQ reference file from FINAL folder."""
final_path = os.path.join("FINAL", f"{subject_id}.txt")
try:
with open(final_path, 'r', encoding='utf-8') as f:
content = f.read()
print(f" โœ“ Loaded FINAL/{subject_id}.txt ({len(content)} chars)")
return content
except FileNotFoundError:
print(f" โš  FINAL/{subject_id}.txt not found")
return ""
except Exception as e:
print(f" โŒ Error loading FINAL/{subject_id}.txt: {e}")
return ""
def generate_exam_questions(subject_id, door_file, question_count, difficulty, username):
"""Generate MCQ questions using GPT with pooled connection."""
subject_data = subject_loader.load(subject_id)
if not subject_data:
return None, f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{subject_id}' ุบูŠุฑ ู…ูˆุฌูˆุฏ"
door_content = subject_data.get(door_file, "")
if not door_content:
return None, f"ู…ู„ู ุงู„ุจุงุจ '{door_file}' ุบูŠุฑ ู…ูˆุฌูˆุฏ ุฃูˆ ูุงุฑุบ"
final_mcq_content = load_final_mcq(subject_id)
main_content = subject_data.get("main.txt", "")
difficulty_map = {
"easy": "ุณู‡ู„ุฉ - ุฃุณุฆู„ุฉ ู…ุจุงุดุฑุฉ ุชุนุชู…ุฏ ุนู„ู‰ ุงู„ุญูุธ ูˆุงู„ูู‡ู… ุงู„ุฃุณุงุณูŠ",
"medium": "ู…ุชูˆุณุทุฉ - ุฃุณุฆู„ุฉ ุชุนุชู…ุฏ ุนู„ู‰ ุงู„ูู‡ู… ูˆุงู„ุชุทุจูŠู‚",
"hard": "ุตุนุจุฉ - ุฃุณุฆู„ุฉ ุชุญู„ูŠู„ูŠุฉ ูˆุชุทุจูŠู‚ูŠุฉ ุนู…ูŠู‚ุฉ ุชุชุทู„ุจ ูู‡ู…ุงู‹ ู…ุชู‚ุฏู…ุงู‹"
}
difficulty_text = difficulty_map.get(difficulty, "ู…ุชูˆุณุทุฉ")
exam_prompt = f"""{main_content}
=== ู…ุญุชูˆู‰ ุงู„ุจุงุจ ุงู„ุฏุฑุงุณูŠ ===
{door_content}
=== ุฃุณุฆู„ุฉ ุงู„ุณู†ูˆุงุช ุงู„ุณุงุจู‚ุฉ (ู„ู„ุงุณุชุฆู†ุงุณ ูู‚ุท) ===
{final_mcq_content if final_mcq_content else "ู„ุง ุชูˆุฌุฏ ุฃุณุฆู„ุฉ ู…ุฑุฌุนูŠุฉ ู…ุชุงุญุฉ"}
=== ุชุนู„ูŠู…ุงุช ุชูˆู„ูŠุฏ ุงู„ุฃุณุฆู„ุฉ ===
ุฃู†ุช ู…ุฏุฑุณ ุฎุจูŠุฑ ู…ุชุฎุตุต ููŠ ุฅุนุฏุงุฏ ุฃุณุฆู„ุฉ ุงู„ุงุฎุชูŠุงุฑ ู…ู† ู…ุชุนุฏุฏ (MCQ).
ู…ู‡ู…ุชูƒ: ุชูˆู„ูŠุฏ {question_count} ุณุคุงู„ ุงุฎุชูŠุงุฑ ู…ู† ู…ุชุนุฏุฏ ุจู…ุณุชูˆู‰ {difficulty_text}.
**ู‚ูˆุงุนุฏ ู…ู‡ู…ุฉ ุฌุฏุงู‹:**
1. ุงุนุชู…ุฏ ุจุดูƒู„ ุฑุฆูŠุณูŠ ุนู„ู‰ ู…ุญุชูˆู‰ ุงู„ุจุงุจ ุงู„ุฏุฑุงุณูŠ ุฃุนู„ุงู‡ ู„ุตูŠุงุบุฉ ุงู„ุฃุณุฆู„ุฉ
2. ูŠู…ูƒู†ูƒ ุงู„ุงุณุชุฆู†ุงุณ ุจุฃุณุฆู„ุฉ ุงู„ุณู†ูˆุงุช ุงู„ุณุงุจู‚ุฉ ู„ูู‡ู… ู†ู…ุท ุงู„ุฃุณุฆู„ุฉ ูˆุฃุณู„ูˆุจ ุงู„ู…ูุนูุฏู‘ูŠู†ุŒ ู„ูƒู† ู„ุง ุชู†ุณุฎู‡ุง ุญุฑููŠุงู‹
3. ูƒู„ ุณุคุงู„ ูŠุฌุจ ุฃู† ูŠุญุชูˆูŠ ุนู„ู‰ 4 ุฎูŠุงุฑุงุช: 3 ุฎุงุทุฆุฉ ูˆ1 ุตุญูŠุญุฉ
4. ุงุฌุนู„ ุงู„ุฎูŠุงุฑุงุช ุงู„ุฎุงุทุฆุฉ ู…ู†ุทู‚ูŠุฉ ูˆู‚ุฑูŠุจุฉ ู…ู† ุงู„ุตุญูŠุญุฉ
5. ุชู†ูˆู‘ุน ููŠ ุฃู†ูˆุงุน ุงู„ุฃุณุฆู„ุฉ: ุชุนุฑูŠูุŒ ุชุทุจูŠู‚ุŒ ู…ู‚ุงุฑู†ุฉุŒ ุงุณุชู†ุชุงุฌ
**ุตูŠุบุฉ ุงู„ุฅุฌุงุจุฉ ุงู„ู…ุทู„ูˆุจุฉ (JSON ูู‚ุท - ู„ุง ุชุถู ุฃูŠ ู†ุต ุฎุงุฑุฌ JSON):**
{{
"questions": [
{{
"question": "ู†ุต ุงู„ุณุคุงู„ ู‡ู†ุง",
"correct": "ุงู„ุฅุฌุงุจุฉ ุงู„ุตุญูŠุญุฉ",
"wrong1": "ุฎูŠุงุฑ ุฎุงุทุฆ ุฃูˆู„",
"wrong2": "ุฎูŠุงุฑ ุฎุงุทุฆ ุซุงู†ูŠ",
"wrong3": "ุฎูŠุงุฑ ุฎุงุทุฆ ุซุงู„ุซ"
}}
]
}}
ุงู„ุขู† ุฃู†ุดุฆ {question_count} ุณุคุงู„ ุจู…ุณุชูˆู‰ {difficulty_text} ุจุตูŠุบุฉ JSON ูู‚ุท:"""
payload = {
"user_input": exam_prompt,
"chat_history": [{"role": "system", "content": ""}],
"temperature": 0.7,
"top_p": 0.95,
"max_completion_tokens": 4000
}
try:
response = gpt_session.post(GPT_URL, json=payload, timeout=GPT_TIMEOUT)
response.raise_for_status()
result = response.json().get("assistant_response", "")
if not result:
return None, "ู„ู… ูŠุชู… ุงู„ุญุตูˆู„ ุนู„ู‰ ุงุณุชุฌุงุจุฉ ู…ู† ุงู„ุฎุงุฏู…"
print(f"โœ… Exam generation complete: {len(result)} chars")
return result, None
except Exception as e:
print(f"โŒ Exam generation error: {e}")
return None, str(e)
def parse_mcq_json(json_string):
"""Parse the JSON MCQ response into a list of question dicts."""
questions = []
seen = set()
try:
json_string = re.sub(r'```(?:json)?\s*', '', json_string)
json_string = re.sub(r'```', '', json_string).strip()
match = re.search(r'\{.*\}', json_string, re.DOTALL)
if not match:
print("โŒ No JSON object found in response")
return []
data = json.loads(match.group())
raw_questions = data.get("questions", [])
for item in raw_questions:
question_text = item.get("question", "").strip()
correct_text = item.get("correct", "").strip()
wrong1 = item.get("wrong1", "").strip()
wrong2 = item.get("wrong2", "").strip()
wrong3 = item.get("wrong3", "").strip()
if not question_text or not correct_text:
continue
dedup_key = re.sub(r'\s+', ' ', question_text).strip()
if dedup_key in seen:
continue
seen.add(dedup_key)
wrong_texts = [w for w in [wrong1, wrong2, wrong3] if w]
if not wrong_texts:
continue
while len(wrong_texts) < 3:
wrong_texts.append(wrong_texts[-1])
choices = [{"text": correct_text, "is_correct": True}]
for w in wrong_texts[:3]:
choices.append({"text": w, "is_correct": False})
random.shuffle(choices)
questions.append({
"question": question_text,
"choices": choices,
"correct_answer": correct_text
})
print(f"โœ… Parsed {len(questions)} MCQ questions from JSON")
return questions
except json.JSONDecodeError as e:
print(f"โŒ JSON parse error: {e}")
return []
except Exception as e:
print(f"โŒ Parse error: {e}")
return []
================================================================================
FILE 23: exam\routes.py
FULL PATH: main_project\exam\routes.py
================================================================================
"""
Exam API routes.
Updated to use MemoryDB for user lookups.
"""
import threading
from datetime import datetime
from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for, flash
from auth.helpers import is_session_valid
from memory_db import get_db
from subjects.access import validate_user_subject_access
from subjects.definitions import SUBJECTS, get_subject_name
from subjects.loader import subject_loader
from exam.generator import generate_exam_questions, parse_mcq_json
exam_bp = Blueprint('exam', __name__)
_exam_sessions = {}
_exam_lock = threading.Lock()
def _create_exam_session(username, subject_id, door_file, question_count, difficulty):
with _exam_lock:
_exam_sessions[username] = {
"subject_id": subject_id,
"door_file": door_file,
"question_count": question_count,
"difficulty": difficulty,
"questions": [],
"current_index": 0,
"score": 0,
"answers": [],
"started_at": datetime.now().isoformat(),
"status": "generating",
}
return _exam_sessions[username]
def _get_exam_session(username):
with _exam_lock:
return _exam_sessions.get(username, None)
def _clear_exam_session(username):
with _exam_lock:
if username in _exam_sessions:
del _exam_sessions[username]
print(f"๐Ÿ—‘๏ธ Cleared exam session for {username}")
@exam_bp.route('/exam/<subject_id>')
def exam_page(subject_id):
if not is_session_valid():
session.clear()
flash('ุงู†ุชู‡ุช ุงู„ุฌู„ุณุฉุŒ ูŠุฑุฌู‰ ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰.', 'error')
return redirect(url_for('auth.login'))
username = session['username']
db = get_db()
user_data = db.read_key('users', username)
if not user_data or not user_data.get('verified', False):
flash('ุญุณุงุจูƒ ุบูŠุฑ ู…ูุนู‘ู„. ุชูˆุงุตู„ ู…ุน ุงู„ุฅุฏุงุฑุฉ.', 'error')
return redirect(url_for('dashboard'))
has_access, access_error = validate_user_subject_access(username, subject_id)
if not has_access:
flash(access_error, 'error')
return redirect(url_for('dashboard'))
subject_data = subject_loader.load(subject_id)
if not subject_data:
flash(f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{subject_id}' ุบูŠุฑ ู…ูˆุฌูˆุฏ ุนู„ู‰ ุงู„ุฎุงุฏู….", 'error')
return redirect(url_for('dashboard'))
student_type = user_data.get('student_type', 'ุนู„ู…ูŠ')
subject_name = get_subject_name(student_type, subject_id)
p_files = subject_data.get("_p_files", [])
doors = []
for i, pf in enumerate(p_files, 1):
doors.append({
"file": pf,
"label": f"ุงู„ุจุงุจ {i}",
"index": i
})
return render_template('exam.html',
username=username,
subject_id=subject_id,
subject_name=subject_name,
doors=doors,
doors_count=len(doors))
@exam_bp.route('/exam/generate', methods=['POST'])
def exam_generate():
"""Generate MCQ questions for the exam."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
data = request.json
username = session['username']
subject_id = data.get('subject_id', '').strip()
door_file = data.get('door_file', '').strip()
question_count = int(data.get('question_count', 10))
difficulty = data.get('difficulty', 'medium').strip()
if not subject_id:
return jsonify({"error": "ูŠุฑุฌู‰ ุชุญุฏูŠุฏ ุงู„ู…ุงุฏุฉ"}), 400
if not door_file:
return jsonify({"error": "ูŠุฑุฌู‰ ุงุฎุชูŠุงุฑ ุงู„ุจุงุจ"}), 400
if question_count not in [5, 10, 15, 30]:
return jsonify({"error": "ุนุฏุฏ ุงู„ุฃุณุฆู„ุฉ ุบูŠุฑ ุตุญูŠุญ"}), 400
if difficulty not in ['easy', 'medium', 'hard']:
return jsonify({"error": "ู…ุณุชูˆู‰ ุงู„ุตุนูˆุจุฉ ุบูŠุฑ ุตุญูŠุญ"}), 400
has_access, access_error = validate_user_subject_access(username, subject_id)
if not has_access:
return jsonify({"error": access_error, "code": "no_access"}), 403
subject_data = subject_loader.load(subject_id)
if not subject_data:
return jsonify({"error": f"ู…ุฌู„ุฏ ุงู„ู…ุงุฏุฉ '{subject_id}' ุบูŠุฑ ู…ูˆุฌูˆุฏ"}), 404
p_files = subject_data.get("_p_files", [])
if door_file not in p_files:
return jsonify({"error": f"ุงู„ุจุงุจ '{door_file}' ุบูŠุฑ ู…ูˆุฌูˆุฏ ููŠ ู‡ุฐู‡ ุงู„ู…ุงุฏุฉ"}), 400
print(f"๐Ÿ“ Generating exam: user={username}, subject={subject_id}, "
f"door={door_file}, count={question_count}, difficulty={difficulty}")
_create_exam_session(username, subject_id, door_file, question_count, difficulty)
raw_json, error = generate_exam_questions(
subject_id=subject_id,
door_file=door_file,
question_count=question_count,
difficulty=difficulty,
username=username
)
if error:
_clear_exam_session(username)
return jsonify({"error": f"ูุดู„ ุชูˆู„ูŠุฏ ุงู„ุฃุณุฆู„ุฉ: {error}"}), 500
if not raw_json:
_clear_exam_session(username)
return jsonify({"error": "ู„ู… ูŠุชู… ุงู„ุญุตูˆู„ ุนู„ู‰ ุฃุณุฆู„ุฉ ู…ู† ุงู„ุฎุงุฏู…"}), 500
questions = parse_mcq_json(raw_json)
if not questions:
_clear_exam_session(username)
return jsonify({"error": "ูุดู„ ุชุญู„ูŠู„ ุงู„ุฃุณุฆู„ุฉ. ุญุงูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰."}), 500
questions = questions[:question_count]
if len(questions) == 0:
_clear_exam_session(username)
return jsonify({"error": "ู„ู… ูŠุชู… ุชูˆู„ูŠุฏ ุฃูŠ ุฃุณุฆู„ุฉ ุตุญูŠุญุฉ. ุญุงูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰."}), 500
with _exam_lock:
if username in _exam_sessions:
_exam_sessions[username]["questions"] = questions
_exam_sessions[username]["status"] = "active"
_exam_sessions[username]["total"] = len(questions)
print(f"โœ… Exam ready: {len(questions)} questions for {username}")
return jsonify({
"success": True,
"questions": questions,
"total": len(questions),
"subject_id": subject_id,
"door_file": door_file,
"difficulty": difficulty,
"question_count": len(questions)
})
@exam_bp.route('/exam/submit', methods=['POST'])
def exam_submit():
"""Submit exam answers and get score."""
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
data = request.json
username = session['username']
answers = data.get('answers', [])
exam_session = _get_exam_session(username)
if not exam_session:
return jsonify({"error": "ู„ุง ุชูˆุฌุฏ ุฌู„ุณุฉ ุงู…ุชุญุงู† ู†ุดุทุฉ"}), 400
if exam_session.get('status') != 'active':
return jsonify({"error": "ุฌู„ุณุฉ ุงู„ุงู…ุชุญุงู† ุบูŠุฑ ู†ุดุทุฉ"}), 400
questions = exam_session.get('questions', [])
if not questions:
return jsonify({"error": "ู„ุง ุชูˆุฌุฏ ุฃุณุฆู„ุฉ ููŠ ุฌู„ุณุฉ ุงู„ุงู…ุชุญุงู†"}), 400
score = 0
results = []
for i, question in enumerate(questions):
correct_answer = question['correct_answer']
selected_answer = None
for ans in answers:
if ans.get('question_index') == i:
selected_answer = ans.get('selected_answer', '')
break
is_correct = (selected_answer == correct_answer)
if is_correct:
score += 1
results.append({
"question_index": i,
"question": question['question'],
"correct_answer": correct_answer,
"selected_answer": selected_answer,
"is_correct": is_correct,
"choices": question['choices']
})
total = len(questions)
percentage = round((score / total) * 100, 1) if total > 0 else 0
if percentage >= 90:
grade_msg = "ู…ู…ุชุงุฒ! ๐ŸŒŸ"
grade_color = "success"
elif percentage >= 75:
grade_msg = "ุฌูŠุฏ ุฌุฏุงู‹! ๐Ÿ‘"
grade_color = "info"
elif percentage >= 60:
grade_msg = "ุฌูŠุฏ ๐Ÿ‘Œ"
grade_color = "warning"
elif percentage >= 50:
grade_msg = "ู…ู‚ุจูˆู„"
grade_color = "warning"
else:
grade_msg = "ูŠุญุชุงุฌ ุฅู„ู‰ ู…ุฑุงุฌุนุฉ ๐Ÿ“š"
grade_color = "danger"
with _exam_lock:
if username in _exam_sessions:
_exam_sessions[username]['status'] = 'done'
_exam_sessions[username]['score'] = score
_exam_sessions[username]['answers'] = answers
_exam_sessions[username]['finished_at'] = datetime.now().isoformat()
print(f"โœ… Exam submitted: {username} scored {score}/{total} ({percentage}%)")
return jsonify({
"success": True,
"score": score,
"total": total,
"percentage": percentage,
"grade_msg": grade_msg,
"grade_color": grade_color,
"results": results
})
@exam_bp.route('/exam/clear', methods=['POST'])
def exam_clear():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
_clear_exam_session(username)
return jsonify({"success": True, "message": "ุชู… ู…ุณุญ ุฌู„ุณุฉ ุงู„ุงู…ุชุญุงู†"})
@exam_bp.route('/exam/status', methods=['GET'])
def exam_status():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
exam_session = _get_exam_session(username)
if not exam_session:
return jsonify({"has_exam": False})
return jsonify({
"has_exam": True,
"status": exam_session.get('status'),
"subject_id": exam_session.get('subject_id'),
"door_file": exam_session.get('door_file'),
"question_count": exam_session.get('question_count'),
"difficulty": exam_session.get('difficulty'),
"total": exam_session.get('total', 0),
"started_at": exam_session.get('started_at')
})
================================================================================
FILE 24: github_storage.py
FULL PATH: main_project\github_storage.py
================================================================================
"""
GitHub-based persistent storage.
Saves/loads JSON database files to/from a GitHub repository.
All data lives in RAM during runtime.
Manual backup/restore via GitHub API.
Usage:
storage = GitHubStorage.get_instance()
# Save all DBs to GitHub
storage.push_all()
# Load all DBs from GitHub
storage.pull_all()
# Save single DB
storage.push_file('users.json', data_dict)
# Load single DB
data = storage.pull_file('users.json')
"""
import os
import json
import base64
import threading
from datetime import datetime
try:
from http_pool import gpt_session as _http_session
except ImportError:
import requests as _http_session
class GitHubStorage:
"""
GitHub API storage backend.
Pushes/pulls JSON files to a private GitHub repo.
"""
_instance = None
_init_lock = threading.Lock()
# โ”€โ”€โ”€ Configuration (override via environment variables) โ”€โ”€โ”€
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "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")
GITHUB_REPO = os.environ.get("GITHUB_REPO", "serversclass-dev/db")
GITHUB_BRANCH = os.environ.get("GITHUB_BRANCH", "main")
GITHUB_DATA_DIR = os.environ.get("GITHUB_DATA_DIR", "db") # folder inside repo
GITHUB_API_BASE = "https://api.github.com"
# Files to sync
DB_FILES = {
'users': 'users.json',
'telegram': 'users_db.json',
'cards': 'cards.json',
'chat_history': 'chat_history_db.json',
}
@classmethod
def get_instance(cls):
if cls._instance is None:
with cls._init_lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self._lock = threading.Lock()
self._file_shas = {} # Track SHA for each file (needed for GitHub updates)
# Validate config - just check they're not empty and not the exact placeholder
self._configured = bool(
self.GITHUB_TOKEN
and len(self.GITHUB_TOKEN) > 10
and self.GITHUB_REPO
and "/" in self.GITHUB_REPO
)
if self._configured:
print(f" โœ… GitHubStorage initialized")
print(f" Repo: {self.GITHUB_REPO}")
print(f" Branch: {self.GITHUB_BRANCH}")
print(f" Data dir: {self.GITHUB_DATA_DIR}/")
else:
print(f" โš ๏ธ GitHubStorage NOT configured - set GITHUB_TOKEN and GITHUB_REPO")
print(f" Current token: {self.GITHUB_TOKEN[:10]}...")
print(f" Current repo: {self.GITHUB_REPO}")
def _headers(self):
return {
"Authorization": f"token {self.GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
"Content-Type": "application/json",
}
def _file_url(self, filename):
"""Build GitHub API URL for a file."""
path = f"{self.GITHUB_DATA_DIR}/{filename}" if self.GITHUB_DATA_DIR else filename
return (
f"{self.GITHUB_API_BASE}/repos/{self.GITHUB_REPO}"
f"/contents/{path}?ref={self.GITHUB_BRANCH}"
)
def _check_configured(self):
if not self._configured:
return False, "GitHubStorage not configured. Set GITHUB_TOKEN and GITHUB_REPO environment variables."
return True, None
# โ”€โ”€โ”€ PULL (GitHub โ†’ Memory) โ”€โ”€โ”€
def pull_file(self, filename):
"""
Download a single JSON file from GitHub.
Returns dict or empty dict if not found (first time).
NEVER fails on 404 - just returns empty.
"""
ok, err = self._check_configured()
if not ok:
print(f" โŒ {err}")
return {}
url = self._file_url(filename)
try:
response = _http_session.get(url, headers=self._headers(), timeout=30)
if response.status_code == 404:
# File doesn't exist on GitHub yet - totally normal for first run
print(f" โ„น๏ธ {filename} not on GitHub yet (will be created on first backup)")
return {}
if response.status_code == 403:
remaining = response.headers.get('X-RateLimit-Remaining', '?')
print(f" โŒ GitHub API rate limited. Remaining: {remaining}")
return {}
response.raise_for_status()
data = response.json()
# Save SHA for future updates
self._file_shas[filename] = data.get('sha', '')
# Decode content (base64)
content_b64 = data.get('content', '')
if not content_b64:
print(f" โš ๏ธ {filename} is empty on GitHub")
return {}
content_bytes = base64.b64decode(content_b64)
content_str = content_bytes.decode('utf-8')
parsed = json.loads(content_str)
if not isinstance(parsed, dict):
print(f" โš ๏ธ {filename} on GitHub is not a dict, ignoring")
return {}
record_count = len(parsed)
print(f" โœ… Pulled {filename} from GitHub ({record_count} records)")
return parsed
except json.JSONDecodeError as e:
print(f" โŒ {filename} on GitHub has invalid JSON: {e}")
return {}
except Exception as e:
print(f" โŒ Error pulling {filename} from GitHub: {e}")
return {}
def pull_all(self):
"""
Download ALL database files from GitHub.
Returns (dict_of_stores, None) always.
Missing files return empty dict (first run is fine).
"""
ok, err = self._check_configured()
if not ok:
return {s: {} for s in self.DB_FILES}, err
print(f"\n ๐Ÿ“ฅ Pulling all databases from GitHub...")
results = {}
for store_name, filename in self.DB_FILES.items():
data = self.pull_file(filename)
results[store_name] = data if data else {}
total_records = sum(len(v) for v in results.values())
if total_records == 0:
print(f" โ„น๏ธ All databases empty (first run - files will be created on first /backup_db)")
else:
print(f" โœ… Pull complete: {total_records} total records across {len(results)} stores")
return results, None
def push_file(self, filename, data_dict, message=None):
"""
Upload a single JSON file to GitHub.
CREATES the file if it doesn't exist.
UPDATES the file if it already exists.
Handles the db/ directory automatically.
"""
ok, err = self._check_configured()
if not ok:
return False, err
url = self._file_url(filename)
if message is None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
record_count = len(data_dict) if isinstance(data_dict, dict) else 0
message = f"Backup {filename} - {record_count} records - {timestamp}"
# Encode content to base64
content_str = json.dumps(data_dict, indent=2, ensure_ascii=False)
content_b64 = base64.b64encode(content_str.encode('utf-8')).decode('utf-8')
payload = {
"message": message,
"content": content_b64,
"branch": self.GITHUB_BRANCH,
}
# Get current SHA if file exists (needed for update, not for create)
current_sha = self._file_shas.get(filename)
if not current_sha:
# Try to fetch SHA from GitHub (file might exist but we don't have SHA cached)
try:
check_url = self._file_url(filename)
check_resp = _http_session.get(check_url, headers=self._headers(), timeout=15)
if check_resp.status_code == 200:
current_sha = check_resp.json().get('sha', '')
self._file_shas[filename] = current_sha
elif check_resp.status_code == 404:
# File doesn't exist yet - will be CREATED
current_sha = None
print(f" ๐Ÿ“ {filename} will be created on GitHub")
except Exception:
current_sha = None
if current_sha:
payload["sha"] = current_sha
try:
response = _http_session.put(url, headers=self._headers(), json=payload, timeout=30)
if response.status_code == 403:
remaining = response.headers.get('X-RateLimit-Remaining', '?')
reset_time = response.headers.get('X-RateLimit-Reset', '?')
err_msg = f"GitHub API rate limited. Remaining: {remaining}, Reset: {reset_time}"
print(f" โŒ {err_msg}")
return False, err_msg
if response.status_code == 409:
# SHA conflict - pull fresh SHA and retry
print(f" โš ๏ธ SHA conflict for {filename}, retrying...")
try:
check_resp = _http_session.get(
self._file_url(filename), headers=self._headers(), timeout=15
)
if check_resp.status_code == 200:
fresh_sha = check_resp.json().get('sha', '')
payload["sha"] = fresh_sha
response = _http_session.put(
url, headers=self._headers(), json=payload, timeout=30
)
elif check_resp.status_code == 404:
# File was deleted? Create fresh
if "sha" in payload:
del payload["sha"]
response = _http_session.put(
url, headers=self._headers(), json=payload, timeout=30
)
except Exception as retry_err:
return False, f"Retry failed: {retry_err}"
if response.status_code == 422:
# Unprocessable - might need to remove SHA for fresh create
if "sha" in payload:
del payload["sha"]
response = _http_session.put(
url, headers=self._headers(), json=payload, timeout=30
)
if response.status_code in [200, 201]:
resp_data = response.json()
new_sha = resp_data.get('content', {}).get('sha', '')
if new_sha:
self._file_shas[filename] = new_sha
action = "Created" if response.status_code == 201 else "Updated"
record_count = len(data_dict) if isinstance(data_dict, dict) else 0
print(f" โœ… {action} {filename} on GitHub ({record_count} records)")
return True, None
else:
err_text = ""
try:
err_text = response.json().get('message', response.text[:200])
except Exception:
err_text = response.text[:200]
err_msg = f"GitHub API error {response.status_code}: {err_text}"
print(f" โŒ {err_msg}")
return False, err_msg
except Exception as e:
err_msg = f"Error pushing {filename} to GitHub: {e}"
print(f" โŒ {err_msg}")
return False, err_msg
def push_all(self, data_dict_map=None):
"""
Upload ALL database files to GitHub.
No local file operations.
"""
ok, err = self._check_configured()
if not ok:
return False, [err]
# If no data provided, read from MemoryDB
if data_dict_map is None:
try:
from memory_db import get_db
db = get_db()
data_dict_map = {}
for store_name in db.STORES:
data_dict_map[store_name] = db.read(store_name)
except Exception as e:
return False, [f"Failed to read from MemoryDB: {e}"]
print(f"\n ๐Ÿ“ค Pushing all databases to GitHub...")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
errors = []
success_count = 0
for store_name, filename in self.DB_FILES.items():
data = data_dict_map.get(store_name, {})
record_count = len(data) if isinstance(data, dict) else 0
message = f"Backup {filename} - {record_count} records - {timestamp}"
success, error = self.push_file(filename, data, message=message)
if success:
success_count += 1
else:
errors.append(f"{filename}: {error}")
total = len(self.DB_FILES)
print(f" {'โœ…' if not errors else 'โš ๏ธ'} Push complete: {success_count}/{total} files")
if errors:
for e in errors:
print(f" โŒ {e}")
return len(errors) == 0, errors
# โ”€โ”€โ”€ STATUS โ”€โ”€โ”€
def get_status(self):
"""Get GitHub storage status and rate limit info."""
ok, err = self._check_configured()
if not ok:
return {
"configured": False,
"error": err
}
status = {
"configured": True,
"repo": self.GITHUB_REPO,
"branch": self.GITHUB_BRANCH,
"data_dir": self.GITHUB_DATA_DIR,
"files": {},
"rate_limit": None,
}
# Check rate limit
try:
resp = _http_session.get(
f"{self.GITHUB_API_BASE}/rate_limit",
headers=self._headers(),
timeout=10
)
if resp.status_code == 200:
rl = resp.json().get('resources', {}).get('core', {})
status["rate_limit"] = {
"limit": rl.get('limit', 0),
"remaining": rl.get('remaining', 0),
"reset_at": datetime.fromtimestamp(
rl.get('reset', 0)
).isoformat() if rl.get('reset') else None,
"used": rl.get('used', 0),
}
except Exception as e:
status["rate_limit"] = {"error": str(e)}
# Check each file
for store_name, filename in self.DB_FILES.items():
status["files"][store_name] = {
"filename": filename,
"has_sha": filename in self._file_shas,
}
return status
# โ”€โ”€โ”€ Singleton access โ”€โ”€โ”€
def get_github_storage():
return GitHubStorage.get_instance()
================================================================================
FILE 25: gunicorn_config.py
FULL PATH: main_project\gunicorn_config.py
================================================================================
"""
Gunicorn configuration for Docker deployment.
SINGLE WORKER - because MemoryDB is in-process.
Gevent handles 200+ concurrent connections in 1 worker.
"""
import os
# SINGLE WORKER with gevent handles 200+ concurrent via greenlets
# Multiple workers = separate memory = session problems
worker_class = "gevent"
workers = 1
worker_connections = 500
bind = f"0.0.0.0:{os.environ.get('PORT', 7860)}"
timeout = 300
graceful_timeout = 120
keepalive = 65
max_requests = 0 # Don't restart worker (would lose memory)
max_requests_jitter = 0
preload_app = True
accesslog = "-"
errorlog = "-"
loglevel = os.environ.get("LOG_LEVEL", "info")
limit_request_line = 0
limit_request_fields = 200
limit_request_field_size = 0
def on_starting(server):
print("\n" + "โ•" * 60)
print(" ๐ŸŽ“ CORVO AI - Starting Production Server")
print(" โšก 1 Worker + 500 Gevent Connections = 500 concurrent users")
print("โ•" * 60 + "\n")
def on_exit(server):
try:
from memory_db import get_db
db = get_db()
db.shutdown()
except Exception as e:
print(f" โš ๏ธ Shutdown error: {e}")
================================================================================
FILE 26: http_pool.py
FULL PATH: main_project\http_pool.py
================================================================================
"""
HTTP Connection Pool Manager.
Reuses TCP connections to external APIs instead of creating new ones per request.
This is THE key optimization for 200 concurrent users hitting external APIs.
Without this: each request opens a new TCP connection + TLS handshake = ~200ms overhead
With this: connections are reused, overhead drops to ~5ms
"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from config import (
GPT_URL, TTS_URL, TRANSCRIPT_URL, CLOUDINARY_URL,
GPT_POOL_SIZE, TTS_POOL_SIZE, MISTRAL_POOL_SIZE,
TRANSCRIPT_POOL_SIZE, CLOUDINARY_POOL_SIZE,
GPT_TIMEOUT, TTS_TIMEOUT, MISTRAL_TIMEOUT, TRANSCRIPT_TIMEOUT
)
def _create_session(pool_size, pool_block=True, retries=2):
"""
Create a requests.Session with connection pooling.
pool_size: max concurrent connections to same host
pool_block: if True, blocks when pool is full instead of creating new connection
This prevents overwhelming the external API
retries: auto-retry on connection errors
"""
session = requests.Session()
retry_strategy = Retry(
total=retries,
backoff_factor=0.5,
status_forcelist=[502, 503, 504],
allowed_methods=["POST", "GET"],
)
adapter = HTTPAdapter(
pool_connections=pool_size,
pool_maxsize=pool_size,
pool_block=pool_block,
max_retries=retry_strategy,
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
# โ”€โ”€โ”€ Pre-built sessions for each external service โ”€โ”€โ”€
# GPT API pool - highest traffic
gpt_session = _create_session(GPT_POOL_SIZE)
# TTS pool - high traffic from board
tts_session = _create_session(TTS_POOL_SIZE)
# Mistral pool - streaming chat
mistral_session = _create_session(MISTRAL_POOL_SIZE)
# Transcript pool - voice uploads
transcript_session = _create_session(TRANSCRIPT_POOL_SIZE)
# Cloudinary pool - image uploads
cloudinary_session = _create_session(CLOUDINARY_POOL_SIZE)
# Icons8 pool - icon lookups (cached, low traffic)
icons8_session = _create_session(5)
# Claude/Board processor pool
board_processor_session = _create_session(GPT_POOL_SIZE)
print("โœ… HTTP Connection Pools initialized:")
print(f" GPT: {GPT_POOL_SIZE} connections")
print(f" TTS: {TTS_POOL_SIZE} connections")
print(f" Mistral: {MISTRAL_POOL_SIZE} connections")
print(f" Transcript: {TRANSCRIPT_POOL_SIZE} connections")
print(f" Cloudinary: {CLOUDINARY_POOL_SIZE} connections")
================================================================================
FILE 27: json_processor.py
FULL PATH: main_project\json_processor.py
================================================================================
import json
import re
from typing import Optional, Dict, Any, List
from http_pool import board_processor_session
class BoardProcessor:
def __init__(self, api_url: str = "https://corvo-ai-claude-4-6-opus.hf.space/chat"):
self.api_url = api_url
self.system_prompt = self._load_system_prompt()
def _load_system_prompt(self) -> str:
"""Load system prompt from system.txt file"""
try:
with open('system.txt', 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
return """You are a converter that transforms XML board data into JSON format.
Convert the given XML to JSON following these rules:
1. Extract all elements from the XML
2. Convert them to proper JSON structure
3. Wrap your JSON output in <json></json> tags
Example:
Input: <board><note color="yellow" x="100" y="200">Hello</note></board>
Output: <json>{"board": {"notes": [{"color": "yellow", "x": 100, "y": 200, "text": "Hello"}]}}</json>
Always wrap your final JSON output in <json></json> tags."""
def _call_ai_api(
self,
user_input: Optional[str] = None,
chat_history: Optional[List[Dict[str, Any]]] = None,
temperature: float = 0.9,
top_p: float = 0.95,
max_tokens: Optional[int] = None
) -> str:
"""Call the AI API using pooled connection."""
payload = {
"user_input": user_input,
"chat_history": chat_history or [],
"temperature": temperature,
"top_p": top_p,
"max_tokens": max_tokens
}
try:
response = board_processor_session.post(
self.api_url, json=payload, timeout=120
)
response.raise_for_status()
result = response.json()
return result.get("assistant_response", "")
except Exception as e:
raise Exception(f"API request failed: {str(e)}")
def _extract_json_from_response(self, response: str) -> str:
"""Extract JSON from <json></json> tags in the AI response."""
print("\n๐Ÿ“‹ DEBUG - Raw AI Response:")
print("=" * 80)
print(response[:500] + ("..." if len(response) > 500 else ""))
print("=" * 80)
patterns = [
r'<json>(.*?)</json>',
r'```json\s*(.*?)\s*```',
r'```\s*(.*?)\s*```',
]
for pattern in patterns:
json_match = re.search(pattern, response, re.DOTALL)
if json_match:
json_str = json_match.group(1).strip()
try:
json.loads(json_str)
return json_str
except json.JSONDecodeError:
continue
try:
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0).strip()
json.loads(json_str)
return json_str
except json.JSONDecodeError:
pass
# Try array
try:
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if json_match:
json_str = json_match.group(0).strip()
json.loads(json_str)
return json_str
except json.JSONDecodeError:
pass
raise ValueError("No valid JSON found in AI response.")
def convert_xml_to_json(
self,
xml_text: str,
temperature: float = 0.9,
top_p: float = 0.95,
max_tokens: Optional[int] = None
) -> str:
"""Convert XML text to Board JSON text using pooled connection."""
chat_history = [
{"role": "system", "content": self.system_prompt}
]
ai_response = self._call_ai_api(
user_input=xml_text,
chat_history=chat_history,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens
)
print(xml_text[:200] + ("..." if len(xml_text) > 200 else ""))
json_text = self._extract_json_from_response(ai_response)
return json_text
================================================================================
FILE 28: market\__init__.py
FULL PATH: main_project\market\__init__.py
================================================================================
"""
Market package.
Subject purchasing, balance management, card redemption.
"""
================================================================================
FILE 29: market\routes.py
FULL PATH: main_project\market\routes.py
================================================================================
"""
Market and balance routes.
Updated to use MemoryDB with atomic operations for thread safety.
"""
import random
from datetime import datetime
from flask import Blueprint, request, jsonify, session, render_template, redirect, url_for, flash
from auth.helpers import is_session_valid
from memory_db import get_db
from subjects.definitions import SUBJECTS
market_bp = Blueprint('market', __name__)
@market_bp.route('/market')
def market():
if not is_session_valid():
session.clear()
flash('ุงู†ุชู‡ุช ุงู„ุฌู„ุณุฉุŒ ูŠุฑุฌู‰ ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰.', 'error')
return redirect(url_for('auth.login'))
username = session['username']
db = get_db()
user_data = db.read_key('users', username) or {}
student_type = user_data.get('student_type', 'ุนู„ู…ูŠ')
all_subjects = SUBJECTS.get(student_type, [])
purchased_subjects = user_data.get('purchased_subjects', ['islamic'])
available_subjects = [s for s in all_subjects if s['id'] not in purchased_subjects]
return render_template('market.html',
username=username,
available_subjects=available_subjects,
balance=user_data.get('balance', 0))
@market_bp.route('/buy_subject', methods=['POST'])
def buy_subject():
if not is_session_valid():
return jsonify({'success': False, 'message': 'ุฌู„ุณุฉ ุบูŠุฑ ุตุงู„ุญุฉ'})
username = session['username']
subject_id = request.json.get('subject_id')
db = get_db()
user_data = db.read_key('users', username)
if not user_data:
return jsonify({'success': False, 'message': 'ุงู„ู…ุณุชุฎุฏู… ุบูŠุฑ ู…ูˆุฌูˆุฏ'})
student_type = user_data.get('student_type', 'ุนู„ู…ูŠ')
all_subjects = SUBJECTS.get(student_type, [])
subject = next((s for s in all_subjects if s['id'] == subject_id), None)
if not subject:
return jsonify({'success': False, 'message': 'ุงู„ู…ุงุฏุฉ ุบูŠุฑ ู…ูˆุฌูˆุฏุฉ'})
purchased_subjects = user_data.get('purchased_subjects', ['islamic'])
if subject_id in purchased_subjects:
return jsonify({'success': False, 'message': 'ู„ุฏูŠูƒ ู‡ุฐู‡ ุงู„ู…ุงุฏุฉ ุจุงู„ูุนู„'})
balance = user_data.get('balance', 0)
if balance < subject['price']:
return jsonify({'success': False, 'message': 'ุฑุตูŠุฏูƒ ุบูŠุฑ ูƒุงูู'})
# Atomic purchase operation
def do_purchase(u):
if not u:
return u
current_balance = u.get('balance', 0)
current_purchased = u.get('purchased_subjects', ['islamic'])
# Double-check inside atomic op
if subject_id in current_purchased:
return u
if current_balance < subject['price']:
return u
u['balance'] = current_balance - subject['price']
if subject_id not in u['purchased_subjects']:
u['purchased_subjects'].append(subject_id)
return u
updated = db.update_key('users', username, do_purchase)
if updated:
return jsonify({
'success': True,
'message': f'ุชู… ุดุฑุงุก {subject["name"]} ุจู†ุฌุงุญ!',
'new_balance': updated.get('balance', 0)
})
else:
return jsonify({'success': False, 'message': 'ูุดู„ ุนู…ู„ูŠุฉ ุงู„ุดุฑุงุก'})
@market_bp.route('/add_balance')
def add_balance():
if not is_session_valid():
session.clear()
flash('ุงู†ุชู‡ุช ุงู„ุฌู„ุณุฉุŒ ูŠุฑุฌู‰ ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰.', 'error')
return redirect(url_for('auth.login'))
username = session['username']
db = get_db()
user_data = db.read_key('users', username) or {}
return render_template('add_balance.html',
username=username,
balance=user_data.get('balance', 0))
@market_bp.route('/redeem_card', methods=['POST'])
def redeem_card():
if not is_session_valid():
return jsonify({'success': False, 'message': 'ูŠุฌุจ ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ุฃูˆู„ุงู‹'})
code = request.json.get('code', '').strip()
if not code:
return jsonify({'success': False, 'message': 'ูŠุฑุฌู‰ ุฅุฏุฎุงู„ ูƒูˆุฏ ุงู„ูƒุฑุช'})
username = session['username']
db = get_db()
# Atomic card redemption
card = db.read_key('cards', code)
if not card:
return jsonify({'success': False, 'message': 'ูƒูˆุฏ ุงู„ูƒุฑุช ุบูŠุฑ ุตุญูŠุญ'})
if card.get('used'):
return jsonify({'success': False, 'message': 'ู‡ุฐุง ุงู„ูƒุฑุช ู…ุณุชุฎุฏู… ู…ุณุจู‚ุงู‹'})
# Mark card as used atomically
card_updated = db.update_key('cards', code, lambda c: {
**(c or {}),
'used': True,
'used_by': username,
'used_at': datetime.now().isoformat()
})
if not card_updated or not card_updated.get('used'):
return jsonify({'success': False, 'message': 'ูุดู„ ุงุณุชุจุฏุงู„ ุงู„ูƒุฑุช'})
card_value = int(card['class'])
message = f"ุชู… ุงุณุชุจุฏุงู„ ูƒุฑุช ูุฆุฉ {card_value} ุจู†ุฌุงุญ!"
# Update user balance and subjects atomically
def apply_card(u):
if not u:
return u
u['balance'] = u.get('balance', 0) + card_value
current_purchased = set(u.get('purchased_subjects', []))
if card['class'] == '35':
sci_subjects_ids = [s['id'] for s in SUBJECTS['ุนู„ู…ูŠ']]
current_purchased.update(sci_subjects_ids)
elif card['class'] == '45':
lit_subjects_ids = [s['id'] for s in SUBJECTS['ุฃุฏุจูŠ']]
current_purchased.update(lit_subjects_ids)
u['purchased_subjects'] = list(current_purchased)
return u
updated_user = db.update_key('users', username, apply_card)
extra_msg = ""
if card['class'] == '35':
extra_msg = " ูˆุชู… ูุชุญ ุฌู…ูŠุน ู…ูˆุงุฏ ุงู„ู‚ุณู… ุงู„ุนู„ู…ูŠ."
elif card['class'] == '45':
extra_msg = " ูˆุชู… ูุชุญ ุฌู…ูŠุน ู…ูˆุงุฏ ุงู„ู‚ุณู… ุงู„ุฃุฏุจูŠ."
return jsonify({
'success': True,
'message': message + extra_msg,
'new_balance': updated_user.get('balance', 0) if updated_user else 0,
'unlocked_count': len(updated_user.get('purchased_subjects', [])) if updated_user else 0
})
@market_bp.route('/admin/generate_cards', methods=['GET', 'POST'])
def admin_generate_cards():
if request.method == 'POST':
card_class = request.form.get('class', '10')
quantity = int(request.form.get('quantity', 1))
db = get_db()
generated_list = []
for _ in range(quantity):
code = ''.join([str(random.randint(0, 9)) for _ in range(16)])
serial = f"{random.randint(1000,9999)}-{random.randint(1000,9999)}-{random.randint(1000,9999)}"
card_data = {
"code": code,
"serial": serial,
"class": card_class,
"used": False,
"used_by": None,
"used_at": None,
"created_at": datetime.now().isoformat()
}
db.write('cards', code, card_data)
generated_list.append(card_data)
return jsonify({'success': True, 'cards': generated_list})
return render_template('admin_generate.html')
================================================================================
FILE 30: media\__init__.py
FULL PATH: main_project\media\__init__.py
================================================================================
"""
Media package.
Voice upload/transcription and image upload/analysis.
"""
================================================================================
FILE 31: media\routes.py
FULL PATH: main_project\media\routes.py
================================================================================
"""
Media API routes.
Voice recording upload, transcription, image upload and analysis.
Updated to use MemoryDB.
"""
import os
import tempfile
from datetime import datetime
from flask import Blueprint, request, jsonify, session
from auth.helpers import is_session_valid
from memory_db import get_db
from subjects.access import validate_user_subject_access
media_bp = Blueprint('media', __name__)
_agent = None
def init_media_agent(agent):
global _agent
_agent = agent
@media_bp.route('/voice-upload', methods=['POST'])
def voice_upload():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
db = get_db()
user = db.read_key('users', username)
if not user or not user.get('verified', False):
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 403
if 'audio' not in request.files:
return jsonify({"error": "ู„ู… ูŠุชู… ุฅุฑุณุงู„ ู…ู„ู ุตูˆุชูŠ"}), 400
audio_file = request.files['audio']
suffix = os.path.splitext(audio_file.filename)[1] if audio_file.filename else '.webm'
if not suffix:
suffix = '.webm'
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')
unique_filename = f"{username}_{timestamp}{suffix}"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp_path = tmp.name
audio_file.save(tmp_path)
try:
file_url, error = _agent.upload_audio(tmp_path, unique_filename)
finally:
try:
os.remove(tmp_path)
except Exception:
pass
if error:
return jsonify({"error": f"ูุดู„ ุฑูุน ุงู„ู…ู„ู ุงู„ุตูˆุชูŠ: {error}"}), 500
return jsonify({"file_url": file_url})
@media_bp.route('/voice-transcribe', methods=['POST'])
def voice_transcribe():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
data = request.json
username = session['username']
file_url = data.get('file_url', '').strip()
db = get_db()
user = db.read_key('users', username)
if not user or not user.get('verified', False):
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 403
if not file_url:
return jsonify({"error": "ู„ู… ูŠุชู… ุชุญุฏูŠุฏ ุฑุงุจุท ุงู„ู…ู„ู ุงู„ุตูˆุชูŠ"}), 400
transcription, error = _agent.transcribe_audio(file_url)
if error:
return jsonify({"error": f"ูุดู„ ุชุญูˆูŠู„ ุงู„ุตูˆุช ุฅู„ู‰ ู†ุต: {error}"}), 500
if not transcription or not transcription.strip():
return jsonify({"error": "ู„ู… ูŠุชู… ุงู„ุชุนุฑู ุนู„ู‰ ุฃูŠ ูƒู„ุงู… ููŠ ุงู„ุชุณุฌูŠู„"}), 400
return jsonify({"transcription": transcription.strip()})
@media_bp.route('/image-upload-analyze', methods=['POST'])
def image_upload_analyze():
if not is_session_valid():
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ", "code": "unauthorized"}), 401
username = session['username']
db = get_db()
user = db.read_key('users', username)
if not user or not user.get('verified', False):
return jsonify({"error": "ุบูŠุฑ ู…ุตุฑุญ"}), 403
session_id = username
current_subject = _agent.chat_history.get_subject(session_id)
if not current_subject:
return jsonify({"error": "ูŠุฑุฌู‰ ุงุฎุชูŠุงุฑ ุงู„ู…ุงุฏุฉ ุฃูˆู„ุงู‹", "need_subject": True}), 400
has_access, access_error = validate_user_subject_access(username, current_subject)
if not has_access:
return jsonify({"error": access_error, "code": "no_access"}), 403
if 'image' not in request.files:
return jsonify({"error": "ู„ู… ูŠุชู… ุฅุฑุณุงู„ ุตูˆุฑุฉ"}), 400
image_file = request.files['image']
if image_file.filename == '':
return jsonify({"error": "ู„ู… ูŠุชู… ุงุฎุชูŠุงุฑ ุตูˆุฑุฉ"}), 400
suffix = os.path.splitext(image_file.filename)[1] or '.jpg'
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp_path = tmp.name
image_file.save(tmp_path)
try:
print(f"๐Ÿ“ค Uploading image to Cloudinary...")
image_url, upload_error = _agent.upload_image_to_cloudinary(tmp_path)
finally:
try:
os.remove(tmp_path)
except Exception:
pass
if upload_error:
return jsonify({"error": f"ูุดู„ ุฑูุน ุงู„ุตูˆุฑุฉ: {upload_error}"}), 500
if not image_url:
return jsonify({"error": "ูุดู„ ุงู„ุญุตูˆู„ ุนู„ู‰ ุฑุงุจุท ุงู„ุตูˆุฑุฉ"}), 500
print(f"โœ… Image uploaded: {image_url}")
print(f"๐Ÿ“ Analyzing image with GPT...")
analysis = _agent.analyze_image(image_url, session_id)
if not analysis:
return jsonify({"error": "ูุดู„ ุชุญู„ูŠู„ ุงู„ุตูˆุฑุฉ. ุญุงูˆู„ ู…ุฑุฉ ุฃุฎุฑู‰."}), 500
analysis = analysis.strip()
if '<unsupported>' in analysis.lower():
return jsonify({
"supported": False,
"message": "โš ๏ธ ู‡ุฐู‡ ุงู„ุตูˆุฑุฉ ู„ุง ุชุญุชูˆูŠ ุนู„ู‰ ู…ุญุชูˆู‰ ุฏุฑุงุณูŠ. ูŠุฑุฌู‰ ุฅุฑุณุงู„ ุตูˆุฑ ุชุชุนู„ู‚ ุจุงู„ุฏุฑุงุณุฉ ูู‚ุท."
})
print(f"โœ… Image analyzed successfully, extracted {len(analysis)} chars")
return jsonify({
"supported": True,
"image_url": image_url,
"extracted_text": analysis
})
================================================================================
FILE 32: memory_db.py
FULL PATH: main_project\memory_db.py
================================================================================
"""
In-Memory Database with GitHub-only backup.
All data lives in RAM. NO local file reads or writes.
On startup: loads from GitHub.
Backup: manual push to GitHub via API endpoints.
"""
import time
import threading
from datetime import datetime
class RWLock:
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0
def read_acquire(self):
self._read_ready.acquire()
self._readers += 1
self._read_ready.release()
def read_release(self):
self._read_ready.acquire()
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
self._read_ready.release()
def write_acquire(self):
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def write_release(self):
self._read_ready.release()
class MemoryDB:
"""
Pure in-memory database.
GitHub is the ONLY persistence layer.
Zero local file I/O.
"""
_instance = None
_init_lock = threading.Lock()
# Store names (no file paths needed)
STORES = ['users', 'telegram', 'cards', 'chat_history']
# Map store names to GitHub filenames
STORE_FILES = {
'users': 'users.json',
'telegram': 'users_db.json',
'cards': 'cards.json',
'chat_history': 'chat_history_db.json',
}
@classmethod
def get_instance(cls):
if cls._instance is None:
with cls._init_lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self._data = {}
self._locks = {}
self._backup_lock = threading.Lock()
self._last_backup = time.time()
for store_name in self.STORES:
self._locks[store_name] = RWLock()
self._data[store_name] = {}
# Load from GitHub on startup
self._initial_load()
print(f" โœ… MemoryDB initialized (GitHub-only, zero local files)")
def _initial_load(self):
"""
Load all data from GitHub on startup.
If GitHub is not configured or empty: start with empty databases.
This is SAFE for first run - no files needed anywhere.
"""
try:
from github_storage import get_github_storage
gh = get_github_storage()
if not gh._configured:
print(f"\n โš ๏ธ GitHub not configured - starting with EMPTY databases")
print(f" Set GITHUB_TOKEN and GITHUB_REPO environment variables")
print(f" First /backup_db call will CREATE files on GitHub automatically")
return
print(f"\n ๐Ÿ“ฅ Loading databases from GitHub...")
results, error = gh.pull_all()
# pull_all ALWAYS returns results dict (never None)
if results:
loaded_any = False
for store_name, data in results.items():
if data and isinstance(data, dict) and len(data) > 0:
self._data[store_name] = data
loaded_any = True
if not loaded_any:
print(f"\n โ„น๏ธ All databases empty on GitHub (first run)")
print(f" Users will be created as they sign up")
print(f" Call /backup_db to push data to GitHub anytime")
total = sum(len(self._data[s]) for s in self.STORES)
if total > 0:
print(f"\n ๐Ÿ“Š Total records loaded: {total}")
else:
print(f"\n ๐Ÿ“Š Starting fresh with 0 records")
except ImportError:
print(f" โŒ github_storage module not found")
print(f" โš ๏ธ Starting with empty databases")
except Exception as e:
print(f" โŒ GitHub load error: {e}")
print(f" โš ๏ธ Starting with empty databases (app will work, just no saved data)")
# โ”€โ”€โ”€ READ OPERATIONS โ”€โ”€โ”€
def read(self, store_name):
lock = self._locks.get(store_name)
if not lock:
return {}
lock.read_acquire()
try:
return dict(self._data.get(store_name, {}))
finally:
lock.read_release()
def read_key(self, store_name, key, default=None):
lock = self._locks.get(store_name)
if not lock:
return default
lock.read_acquire()
try:
return self._data.get(store_name, {}).get(key, default)
finally:
lock.read_release()
def read_keys(self, store_name, keys):
lock = self._locks.get(store_name)
if not lock:
return {}
lock.read_acquire()
try:
store = self._data.get(store_name, {})
return {k: store[k] for k in keys if k in store}
finally:
lock.read_release()
def has_key(self, store_name, key):
lock = self._locks.get(store_name)
if not lock:
return False
lock.read_acquire()
try:
return key in self._data.get(store_name, {})
finally:
lock.read_release()
def count(self, store_name):
lock = self._locks.get(store_name)
if not lock:
return 0
lock.read_acquire()
try:
return len(self._data.get(store_name, {}))
finally:
lock.read_release()
# โ”€โ”€โ”€ WRITE OPERATIONS โ”€โ”€โ”€
def write(self, store_name, key, value):
lock = self._locks.get(store_name)
if not lock:
return
lock.write_acquire()
try:
if store_name not in self._data:
self._data[store_name] = {}
self._data[store_name][key] = value
finally:
lock.write_release()
def write_many(self, store_name, updates):
lock = self._locks.get(store_name)
if not lock:
return
lock.write_acquire()
try:
if store_name not in self._data:
self._data[store_name] = {}
self._data[store_name].update(updates)
finally:
lock.write_release()
def write_full(self, store_name, data):
lock = self._locks.get(store_name)
if not lock:
return
lock.write_acquire()
try:
self._data[store_name] = data
finally:
lock.write_release()
def update_key(self, store_name, key, update_fn):
lock = self._locks.get(store_name)
if not lock:
return None
lock.write_acquire()
try:
current = self._data.get(store_name, {}).get(key, None)
new_value = update_fn(current)
if new_value is not None:
if store_name not in self._data:
self._data[store_name] = {}
self._data[store_name][key] = new_value
return new_value
finally:
lock.write_release()
def delete(self, store_name, key):
lock = self._locks.get(store_name)
if not lock:
return False
lock.write_acquire()
try:
if key in self._data.get(store_name, {}):
del self._data[store_name][key]
return True
return False
finally:
lock.write_release()
def delete_many(self, store_name, keys):
lock = self._locks.get(store_name)
if not lock:
return 0
lock.write_acquire()
try:
count = 0
store = self._data.get(store_name, {})
for key in keys:
if key in store:
del store[key]
count += 1
return count
finally:
lock.write_release()
# โ”€โ”€โ”€ QUERY OPERATIONS โ”€โ”€โ”€
def find(self, store_name, predicate):
lock = self._locks.get(store_name)
if not lock:
return []
lock.read_acquire()
try:
return [(k, v) for k, v in self._data.get(store_name, {}).items() if predicate(k, v)]
finally:
lock.read_release()
def find_keys_by_prefix(self, store_name, prefix):
lock = self._locks.get(store_name)
if not lock:
return {}
lock.read_acquire()
try:
store = self._data.get(store_name, {})
return {k: v for k, v in store.items() if k.startswith(prefix)}
finally:
lock.read_release()
# โ”€โ”€โ”€ GITHUB BACKUP OPERATIONS โ”€โ”€โ”€
def push_to_github(self):
"""Push all in-memory data to GitHub."""
try:
from github_storage import get_github_storage
gh = get_github_storage()
data_map = {}
for store_name in self.STORES:
lock = self._locks[store_name]
lock.read_acquire()
try:
data_map[store_name] = dict(self._data.get(store_name, {}))
finally:
lock.read_release()
success, errors = gh.push_all(data_map)
if success:
self._last_backup = time.time()
return success, errors
except Exception as e:
err = f"GitHub push error: {e}"
print(f" โŒ {err}")
return False, [err]
def pull_from_github(self):
"""Pull all data from GitHub and replace in-memory data."""
try:
from github_storage import get_github_storage
gh = get_github_storage()
results, error = gh.pull_all()
if error:
return False, error
if not results:
return False, "No data returned from GitHub"
for store_name, data in results.items():
if data and isinstance(data, dict):
lock = self._locks.get(store_name)
if lock:
lock.write_acquire()
try:
self._data[store_name] = data
finally:
lock.write_release()
total = sum(len(v) for v in results.values())
print(f" โœ… Pulled {total} records from GitHub into memory")
return True, None
except Exception as e:
err = f"GitHub pull error: {e}"
print(f" โŒ {err}")
return False, err
def push_single_to_github(self, store_name):
"""Push a single store to GitHub."""
if store_name not in self.STORE_FILES:
return False, f"Unknown store: {store_name}"
try:
from github_storage import get_github_storage
gh = get_github_storage()
filename = self.STORE_FILES[store_name]
lock = self._locks[store_name]
lock.read_acquire()
try:
data = dict(self._data.get(store_name, {}))
finally:
lock.read_release()
success, error = gh.push_file(filename, data)
return success, error
except Exception as e:
return False, f"Error pushing {store_name}: {e}"
# โ”€โ”€โ”€ STATS โ”€โ”€โ”€
def get_stats(self):
stats = {}
for store_name in self.STORES:
lock = self._locks[store_name]
lock.read_acquire()
try:
stats[store_name] = {
"records": len(self._data.get(store_name, {})),
"github_file": self.STORE_FILES.get(store_name, ""),
}
finally:
lock.read_release()
stats["_meta"] = {
"last_backup": datetime.fromtimestamp(self._last_backup).isoformat(),
"mode": "github_only",
"local_files": False,
}
return stats
def shutdown(self):
"""Graceful shutdown - just log, no local save."""
print(" ๐Ÿ›‘ MemoryDB shutting down...")
total = sum(len(self._data.get(s, {})) for s in self.STORES)
print(f" โš ๏ธ {total} records in memory. Push to GitHub if needed: /backup_db")
print(" โœ… MemoryDB shutdown complete")
def get_db():
return MemoryDB.get_instance()
================================================================================
FILE 33: subjects\__init__.py
FULL PATH: main_project\subjects\__init__.py
================================================================================
"""
Subjects package.
Handles subject definitions, access control, and file loading.
"""
================================================================================
FILE 34: subjects\access.py
FULL PATH: main_project\subjects\access.py
================================================================================
"""
Subject access validation.
Updated to use MemoryDB for fast lookups.
"""
from subjects.definitions import SUBJECTS
from memory_db import get_db
def get_user_accessible_subjects(username):
"""Get all subjects with access status for a user."""
db = get_db()
user = db.read_key('users', username)
if not user:
return None, "ุงู„ู…ุณุชุฎุฏู… ุบูŠุฑ ู…ูˆุฌูˆุฏ"
if not user.get('verified', False):
return None, "ุงู„ุญุณุงุจ ุบูŠุฑ ู…ูุนู‘ู„"
student_type = user.get('student_type', '')
purchased_subjects = user.get('purchased_subjects', [])
if student_type not in SUBJECTS:
return None, f"ู†ูˆุน ุงู„ุทุงู„ุจ '{student_type}' ุบูŠุฑ ู…ุนุฑูˆู"
result = []
for subject in SUBJECTS[student_type]:
is_free = subject.get('free', False) or subject.get('price', 0) == 0
is_purchased = subject['id'] in purchased_subjects
accessible = is_free or is_purchased
result.append({
'id': subject['id'],
'name': subject['name'],
'price': subject['price'],
'icon': subject['icon'],
'free': is_free,
'accessible': accessible
})
return result, None
def validate_user_subject_access(username, subject_id):
"""Check if user has access to a specific subject."""
db = get_db()
user = db.read_key('users', username)
if not user:
return False, "ุงู„ู…ุณุชุฎุฏู… ุบูŠุฑ ู…ูˆุฌูˆุฏ"
if not user.get('verified', False):
return False, "ุงู„ุญุณุงุจ ุบูŠุฑ ู…ูุนู‘ู„"
student_type = user.get('student_type', '')
purchased_subjects = user.get('purchased_subjects', [])
if student_type not in SUBJECTS:
return False, "ู†ูˆุน ุงู„ุทุงู„ุจ ุบูŠุฑ ู…ุนุฑูˆู"
track_subjects = SUBJECTS[student_type]
subject_in_track = next((s for s in track_subjects if s['id'] == subject_id), None)
if not subject_in_track:
return False, f"ุงู„ู…ุงุฏุฉ '{subject_id}' ุบูŠุฑ ู…ุชุงุญุฉ ู„ุดุนุจุชูƒ"
is_free = subject_in_track.get('free', False) or subject_in_track.get('price', 0) == 0
is_purchased = subject_id in purchased_subjects
if is_free or is_purchased:
return True, None
return False, f"ูŠุฌุจ ุดุฑุงุก ู…ุงุฏุฉ '{subject_in_track['name']}' ู„ู„ูˆุตูˆู„ ุฅู„ูŠู‡ุง"
================================================================================
FILE 35: subjects\definitions.py
FULL PATH: main_project\subjects\definitions.py
================================================================================
"""
Subject definitions for all tracks.
Single source of truth for available subjects.
"""
SUBJECTS = {
'ุนู„ู…ูŠ': [
{'id': 'islamic', 'name': 'ุงู„ุชุฑุจูŠุฉ ุงู„ุงุณู„ุงู…ูŠุฉ', 'price': 0, 'icon': 'book-quran', 'free': True},
{'id': 'english', 'name': 'ุงู„ู„ุบุฉ ุงู„ุงู†ุฌู„ูŠุฒูŠุฉ', 'price': 5, 'icon': 'language'},
{'id': 'math', 'name': 'ุงู„ุฑูŠุงุถูŠุงุช', 'price': 5, 'icon': 'calculator'},
{'id': 'it', 'name': 'ุชู‚ู†ูŠุฉ ุงู„ู…ุนู„ูˆู…ุงุช', 'price': 5, 'icon': 'laptop'},
{'id': 'statistics', 'name': 'ุงุณุงุณ ุงู„ุงุญุตุงุก', 'price': 5, 'icon': 'chart-bar'},
{'id': 'chemistry', 'name': 'ุงู„ูƒูŠู…ูŠุงุก', 'price': 5, 'icon': 'flask'},
{'id': 'physics_electric', 'name': 'ุงู„ููŠุฒูŠุงุก ุงู„ูƒู‡ุฑุจุงุฆูŠุฉ', 'price': 5, 'icon': 'bolt'},
{'id': 'physics_mechanic', 'name': 'ุงู„ููŠุฒูŠุงุก ุงู„ู…ูŠูƒุงู†ูŠูƒูŠุฉ', 'price': 5, 'icon': 'gear'},
{'id': 'biology', 'name': 'ุงู„ุงุญูŠุงุก', 'price': 5, 'icon': 'dna'},
{'id': 'literary_studies', 'name': 'ุงู„ุฏุฑุงุณุงุช ุงู„ุงุฏุจูŠุฉ', 'price': 5, 'icon': 'book'},
{'id': 'linguistic_studies','name': 'ุงู„ุฏุฑุงุณุงุช ุงู„ู„ุบูˆูŠุฉ', 'price': 5, 'icon': 'pen'}
],
'ุฃุฏุจูŠ': [
{'id': 'islamic', 'name': 'ุงู„ุชุฑุจูŠุฉ ุงู„ุงุณู„ุงู…ูŠุฉ', 'price': 0, 'icon': 'book-quran', 'free': True},
{'id': 'english', 'name': 'ุงู„ู„ุบุฉ ุงู„ุงู†ุฌู„ูŠุฒูŠุฉ', 'price': 5, 'icon': 'language'},
{'id': 'statisticss', 'name': 'ุงู„ุงุญุตุงุก', 'price': 5, 'icon': 'chart-bar'},
{'id': 'literature', 'name': 'ุงู„ุงุฏุจ ูˆุงู„ู†ุตูˆุต', 'price': 5, 'icon': 'book-open'},
{'id': 'philosophy', 'name': 'ุงู„ูู„ุณูุฉ', 'price': 5, 'icon': 'brain'},
{'id': 'reading_writing', 'name': 'ุงู„ู…ุทุงู„ุนุฉ ูˆุงู„ุฅู†ุดุงุก', 'price': 5, 'icon': 'pen-fancy'},
{'id': 'grammar', 'name': 'ุงู„ู†ุญูˆ ูˆุงู„ุตุฑู ูˆุงู„ุงู…ู„ุงุก', 'price': 5, 'icon': 'spell-check'},
{'id': 'criticism', 'name': 'ุงู„ู†ู‚ุฏ ุงู„ุงุฏุจูŠ', 'price': 5, 'icon': 'comments'},
{'id': 'history', 'name': 'ุชุงุฑูŠุฎ ุงู„ูˆุทู† ุงู„ุนุฑุจูŠ ', 'price': 5, 'icon': 'landmark'},
{'id': 'itt', 'name': 'ุชู‚ู†ูŠุฉ ุงู„ู…ุนู„ูˆู…ุงุช', 'price': 5, 'icon': 'laptop'},
{'id': 'geography', 'name': 'ุฌุบุฑุงููŠุง ุงู„ุจูŠุฆุฉ', 'price': 5, 'icon': 'globe'},
{'id': 'psychology', 'name': 'ุนู„ู… ุงู„ู†ูุณ', 'price': 5, 'icon': 'head-side-virus'},
{'id': 'sociology','name': 'ุนู„ู… ุงู„ุงุฌุชู…ุงุน', 'price': 5, 'icon': 'flag'}
]
}
# โ”€โ”€โ”€ Board-enabled subjects โ”€โ”€โ”€
# Subjects that have the interactive board feature.
# Each must have a folder with main.txt, structure.txt, p*.txt, and pages_base_url.txt
BOARD_ENABLED_SUBJECTS = [
'physics_mechanic',
'physics_electric',
'chemistry',
'math',
'biology',
'statistics',
'data_structure',
]
def get_subject_name(student_type, subject_id):
"""Get display name for a subject."""
track = SUBJECTS.get(student_type, [])
info = next((s for s in track if s['id'] == subject_id), None)
return info['name'] if info else subject_id
def is_board_enabled(subject_id):
"""Check if a subject supports the interactive board."""
return subject_id in BOARD_ENABLED_SUBJECTS
================================================================================
FILE 36: subjects\loader.py
FULL PATH: main_project\subjects\loader.py
================================================================================
"""
Subject file loader.
Loads knowledge files from subject folders.
Handles caching.
"""
import os
import threading
class SubjectLoader:
"""Loads and caches subject data from folders."""
def __init__(self):
self._cache = {}
self._lock = threading.Lock()
print(" โœ” SubjectLoader initialized")
def load(self, subject_id):
"""
Load a subject's files from its folder.
Returns dict with all file contents, or None if folder missing.
Cached after first load.
"""
with self._lock:
if subject_id in self._cache:
return self._cache[subject_id]
folder_path = subject_id
if not os.path.exists(folder_path):
print(f" โŒ Subject folder not found: {folder_path}")
return None
subject_data = {}
# Load main.txt
main_path = os.path.join(folder_path, "main.txt")
try:
with open(main_path, 'r', encoding='utf-8') as f:
subject_data["main.txt"] = f.read()
print(f" โœ” {subject_id}/main.txt ({len(subject_data['main.txt'])} chars)")
except FileNotFoundError:
subject_data["main.txt"] = ""
print(f" โš  {subject_id}/main.txt not found")
# Load structure.txt
structure_path = os.path.join(folder_path, "structure.txt")
try:
with open(structure_path, 'r', encoding='utf-8') as f:
subject_data["structure.txt"] = f.read()
print(f" โœ” {subject_id}/structure.txt ({len(subject_data['structure.txt'])} chars)")
except FileNotFoundError:
subject_data["structure.txt"] = ""
print(f" โš  {subject_id}/structure.txt not found")
# Load pages_base_url.txt (for board page images)
pages_url_path = os.path.join(folder_path, "pages_base_url.txt")
try:
with open(pages_url_path, 'r', encoding='utf-8') as f:
subject_data["pages_base_url"] = f.read().strip()
print(f" โœ” {subject_id}/pages_base_url.txt loaded")
except FileNotFoundError:
subject_data["pages_base_url"] = ""
print(f" โš  {subject_id}/pages_base_url.txt not found")
# Load p*.txt files
p_files = []
i = 1
while True:
p_filename = f"p{i}.txt"
p_path = os.path.join(folder_path, p_filename)
if os.path.exists(p_path):
try:
with open(p_path, 'r', encoding='utf-8') as f:
content = f.read()
subject_data[p_filename] = content
p_files.append(p_filename)
print(f" โœ” {subject_id}/{p_filename} ({len(content)} chars)")
except Exception as e:
print(f" โš  Error loading {p_filename}: {e}")
i += 1
else:
break
subject_data["_p_files"] = p_files
print(f" ๐Ÿ“Š Subject '{subject_id}' loaded: main + structure + {len(p_files)} chapters")
with self._lock:
self._cache[subject_id] = subject_data
return subject_data
def reload(self, subject_id):
"""Force reload a subject by clearing cache first."""
with self._lock:
if subject_id in self._cache:
del self._cache[subject_id]
return self.load(subject_id)
def get_p_files(self, subject_id):
"""Get list of chapter files for a subject."""
data = self.load(subject_id)
if not data:
return []
return data.get("_p_files", [])
def get_pages_base_url(self, subject_id):
"""Get the base URL for book page images."""
data = self.load(subject_id)
if not data:
return ""
return data.get("pages_base_url", "")
# Singleton instance
subject_loader = SubjectLoader()
================================================================================
FILE 37: websocket\__init__.py
FULL PATH: main_project\websocket\__init__.py
================================================================================
"""
WebSocket package.
Real-time session management via SocketIO.
"""
================================================================================
FILE 38: websocket\events.py
FULL PATH: main_project\websocket\events.py
================================================================================
"""
SocketIO event handlers.
Updated to use MemoryDB for fast session checks.
"""
from flask import request
from auth.helpers import active_connections
from memory_db import get_db
def register_socketio_events(socketio):
"""Register all SocketIO event handlers."""
@socketio.on('connect')
def handle_connect():
pass
@socketio.on('disconnect')
def handle_disconnect():
for username in list(active_connections.keys()):
if request.sid in active_connections[username]:
active_connections[username].remove(request.sid)
if not active_connections[username]:
del active_connections[username]
@socketio.on('register_session')
def handle_register_session(data):
username = data.get('username')
session_id = data.get('session_id')
if username and session_id:
if username not in active_connections:
active_connections[username] = []
if request.sid not in active_connections[username]:
active_connections[username].append(request.sid)
@socketio.on('check_session')
def handle_check_session(data):
username = data.get('username')
session_id = data.get('session_id')
db = get_db()
user_data = db.read_key('users', username)
if user_data:
if user_data.get('session_id') != session_id:
socketio.emit('force_logout', {
'message': 'ุชู… ุชุณุฌูŠู„ ุงู„ุฏุฎูˆู„ ู…ู† ุฌู‡ุงุฒ ุขุฎุฑ.'
}, room=request.sid)
else:
socketio.emit('session_valid', {
'status': 'valid'
}, room=request.sid)