Spaces:
Paused
Paused
| from __future__ import annotations | |
| import atexit | |
| import json | |
| import time | |
| from functools import wraps | |
| from typing import Callable | |
| from flask import Flask, Response, flash, g, jsonify, redirect, render_template, request, session, stream_with_context, url_for | |
| from core.config import AppConfig | |
| from core.db import Database | |
| from core.security import SecretBox, hash_password, verify_password | |
| from core.task_manager import TaskManager | |
| from core.runtime_logging import configure_logging, get_logger | |
| configure_logging() | |
| APP_LOGGER = get_logger("sacc.web") | |
| config = AppConfig.load() | |
| store = Database(config.db_path, default_parallel_limit=config.default_parallel_limit) | |
| store.init_db() | |
| secret_box = SecretBox(config.encryption_key) | |
| task_manager = TaskManager(config=config, store=store, secret_box=secret_box) | |
| def _seed_legacy_user() -> None: | |
| if store.list_users(): | |
| return | |
| legacy_path = config.root_dir / "user_data.json" | |
| if not legacy_path.exists(): | |
| return | |
| try: | |
| payload = json.loads(legacy_path.read_text(encoding="utf-8")) | |
| except (OSError, json.JSONDecodeError): | |
| return | |
| student_id = str(payload.get("std_id", "")).strip() | |
| password = str(payload.get("password", "")).strip() | |
| if not student_id or not password: | |
| return | |
| user_id = store.create_user(student_id, secret_box.encrypt(password), "Legacy User") | |
| for source_key, category in (("a", "plan"), ("b", "free")): | |
| for course in payload.get("course", {}).get(source_key, []): | |
| course_id = str(course.get("course_id", "")).strip() | |
| course_index = str(course.get("course_index", "")).strip() | |
| if course_id and course_index: | |
| store.add_course(user_id, category, course_id, course_index) | |
| _seed_legacy_user() | |
| task_manager.start() | |
| atexit.register(task_manager.shutdown) | |
| app = Flask(__name__, template_folder="templates", static_folder="static") | |
| app.secret_key = config.session_secret | |
| app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax") | |
| APP_LOGGER.info( | |
| "Application bootstrap complete | data_dir=%s db_path=%s chrome_binary=%s chromedriver_path=%s default_parallel_limit=%s", | |
| config.data_dir, | |
| config.db_path, | |
| config.chrome_binary, | |
| config.chromedriver_path, | |
| config.default_parallel_limit, | |
| ) | |
| CATEGORY_LABELS = { | |
| "plan": "方案选课", | |
| "free": "自由选课", | |
| } | |
| TASK_LABELS = { | |
| "pending": "排队中", | |
| "running": "执行中", | |
| "cancel_requested": "停止中", | |
| "completed": "已完成", | |
| "stopped": "已停止", | |
| "failed": "失败", | |
| } | |
| SKIPPED_REQUEST_LOG_PREFIXES = ( | |
| "/static/", | |
| "/api/", | |
| ) | |
| SKIPPED_REQUEST_LOG_PATHS = { | |
| "/favicon.ico", | |
| } | |
| def _current_actor_label() -> str: | |
| role = session.get("role", "guest") | |
| if role == "user": | |
| return f"user:{session.get('user_id', '-')}" | |
| if role == "admin": | |
| return f"admin:{session.get('admin_username', '-')}" | |
| return "guest" | |
| def before_request_logging() -> None: | |
| g.request_started_at = time.perf_counter() | |
| def after_request_logging(response): | |
| if request.path in SKIPPED_REQUEST_LOG_PATHS: | |
| return response | |
| if any(request.path.startswith(prefix) for prefix in SKIPPED_REQUEST_LOG_PREFIXES): | |
| return response | |
| started_at = getattr(g, "request_started_at", None) | |
| duration_ms = 0.0 if started_at is None else (time.perf_counter() - started_at) * 1000 | |
| remote_addr = request.headers.get("x-forwarded-for") or request.remote_addr or "-" | |
| APP_LOGGER.info( | |
| "HTTP %s %s -> %s in %.1fms | actor=%s remote=%s", | |
| request.method, | |
| request.path, | |
| response.status_code, | |
| duration_ms, | |
| _current_actor_label(), | |
| remote_addr, | |
| ) | |
| return response | |
| def inject_globals() -> dict: | |
| return { | |
| "category_labels": CATEGORY_LABELS, | |
| "task_labels": TASK_LABELS, | |
| "session_role": session.get("role", "guest"), | |
| } | |
| def _login_required(role: str) -> Callable: | |
| def decorator(view: Callable) -> Callable: | |
| def wrapped(*args, **kwargs): | |
| current_role = session.get("role") | |
| if role == "user" and current_role != "user": | |
| flash("请先登录学生账号。", "warning") | |
| return redirect(url_for("login")) | |
| if role == "admin" and current_role != "admin": | |
| flash("请先登录管理员账号。", "warning") | |
| return redirect(url_for("admin_login")) | |
| return view(*args, **kwargs) | |
| return wrapped | |
| return decorator | |
| def _get_current_user() -> dict | None: | |
| user_id = session.get("user_id") | |
| if not user_id: | |
| return None | |
| return store.get_user(int(user_id)) | |
| def _get_admin_identity() -> dict: | |
| return { | |
| "username": session.get("admin_username", ""), | |
| "is_super_admin": bool(session.get("is_super_admin", False)), | |
| } | |
| def _user_owns_course(user_id: int, course_target_id: int) -> bool: | |
| return any(course["id"] == course_target_id for course in store.list_courses_for_user(user_id)) | |
| def _build_user_dashboard_context(user: dict) -> dict: | |
| return { | |
| "current_user": user, | |
| "courses": store.list_courses_for_user(user["id"]), | |
| "task": store.get_latest_task_for_user(user["id"]), | |
| "recent_logs": store.list_recent_logs(user_id=user["id"], limit=config.logs_page_size), | |
| } | |
| def _build_admin_dashboard_context() -> dict: | |
| users = store.list_users() | |
| for user in users: | |
| user["courses"] = store.list_courses_for_user(user["id"]) | |
| user["latest_task"] = store.get_latest_task_for_user(user["id"]) | |
| admin_identity = _get_admin_identity() | |
| return { | |
| "users": users, | |
| "admins": store.list_admins(), | |
| "stats": store.get_admin_stats(), | |
| "recent_tasks": store.list_recent_tasks(limit=18), | |
| "recent_logs": store.list_recent_logs(limit=config.logs_page_size), | |
| "parallel_limit": store.get_parallel_limit(), | |
| "is_super_admin": admin_identity["is_super_admin"], | |
| "admin_identity": admin_identity, | |
| } | |
| def _queue_task_for_user(user: dict, *, requested_by: str, requested_by_role: str) -> tuple[dict, bool]: | |
| return task_manager.queue_task(user["id"], requested_by=requested_by, requested_by_role=requested_by_role) | |
| def _latest_log_id(logs: list[dict]) -> int: | |
| if not logs: | |
| return 0 | |
| return int(logs[-1]["id"]) | |
| def index(): | |
| if session.get("role") == "user": | |
| return redirect(url_for("dashboard")) | |
| if session.get("role") == "admin": | |
| return redirect(url_for("admin_dashboard")) | |
| return redirect(url_for("login")) | |
| def login(): | |
| if request.method == "POST": | |
| student_id = request.form.get("student_id", "").strip() | |
| password = request.form.get("password", "") | |
| user = store.get_user_by_student_id(student_id) | |
| if user is None: | |
| flash("没有找到该学号对应的账号,请联系管理员录入。", "danger") | |
| return render_template("login.html") | |
| if not user["is_active"]: | |
| flash("该账号已被管理员禁用。", "danger") | |
| return render_template("login.html") | |
| try: | |
| stored_password = secret_box.decrypt(user["password_encrypted"]) | |
| except Exception: | |
| flash("账号数据损坏,请联系管理员重置密码。", "danger") | |
| return render_template("login.html") | |
| if stored_password != password: | |
| flash("学号或密码不正确。", "danger") | |
| return render_template("login.html") | |
| session.clear() | |
| session["role"] = "user" | |
| session["user_id"] = user["id"] | |
| return redirect(url_for("dashboard")) | |
| return render_template("login.html") | |
| def logout(): | |
| session.clear() | |
| return redirect(url_for("login")) | |
| def admin_login(): | |
| if request.method == "POST": | |
| username = request.form.get("username", "").strip() | |
| password = request.form.get("password", "") | |
| is_super_admin = username == config.super_admin_username and password == config.super_admin_password | |
| admin_row = store.get_admin_by_username(username) | |
| is_regular_admin = bool(admin_row and verify_password(admin_row["password_hash"], password)) | |
| if not is_super_admin and not is_regular_admin: | |
| flash("管理员账号或密码错误。", "danger") | |
| return render_template("admin_login.html") | |
| session.clear() | |
| session["role"] = "admin" | |
| session["admin_username"] = username | |
| session["is_super_admin"] = is_super_admin | |
| return redirect(url_for("admin_dashboard")) | |
| return render_template("admin_login.html") | |
| def admin_logout(): | |
| session.clear() | |
| return redirect(url_for("admin_login")) | |
| def dashboard(): | |
| user = _get_current_user() | |
| if user is None: | |
| session.clear() | |
| return redirect(url_for("login")) | |
| return render_template("dashboard.html", **_build_user_dashboard_context(user)) | |
| def update_profile(): | |
| user = _get_current_user() | |
| if user is None: | |
| session.clear() | |
| return redirect(url_for("login")) | |
| password = request.form.get("password", "").strip() | |
| display_name = request.form.get("display_name", "").strip() | |
| if not password: | |
| flash("密码不能为空。", "danger") | |
| return redirect(url_for("dashboard")) | |
| store.update_user(user["id"], password_encrypted=secret_box.encrypt(password), display_name=display_name) | |
| flash("账号信息已更新。", "success") | |
| return redirect(url_for("dashboard")) | |
| def add_course(): | |
| user = _get_current_user() | |
| if user is None: | |
| session.clear() | |
| return redirect(url_for("login")) | |
| category = request.form.get("category", "free") | |
| course_id = request.form.get("course_id", "").strip() | |
| course_index = request.form.get("course_index", "").strip() | |
| if not course_id.isdigit() or not course_index.isdigit() or len(course_index) != 2: | |
| flash("课程号必须为数字,课序号必须是 2 位数字。", "danger") | |
| return redirect(url_for("dashboard")) | |
| store.add_course(user["id"], category, course_id, course_index) | |
| flash("课程已加入任务列表。", "success") | |
| return redirect(url_for("dashboard")) | |
| def delete_course(course_target_id: int): | |
| user = _get_current_user() | |
| if user is None: | |
| session.clear() | |
| return redirect(url_for("login")) | |
| if not _user_owns_course(user["id"], course_target_id): | |
| flash("不能删除不属于当前账号的课程。", "danger") | |
| return redirect(url_for("dashboard")) | |
| store.delete_course(course_target_id) | |
| flash("课程已移除。", "success") | |
| return redirect(url_for("dashboard")) | |
| def start_task(): | |
| user = _get_current_user() | |
| if user is None: | |
| session.clear() | |
| return redirect(url_for("login")) | |
| task, created = _queue_task_for_user(user, requested_by=user["student_id"], requested_by_role="user") | |
| flash("任务已启动。" if created else f"已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning") | |
| return redirect(url_for("dashboard")) | |
| def stop_task(): | |
| user = _get_current_user() | |
| if user is None: | |
| session.clear() | |
| return redirect(url_for("login")) | |
| active_task = store.find_active_task_for_user(user["id"]) | |
| if active_task and task_manager.stop_task(active_task["id"]): | |
| flash("停止请求已发送。", "success") | |
| else: | |
| flash("当前没有可停止的任务。", "warning") | |
| return redirect(url_for("dashboard")) | |
| def admin_dashboard(): | |
| return render_template("admin_dashboard.html", **_build_admin_dashboard_context()) | |
| def update_parallel_limit(): | |
| try: | |
| parallel_limit = max(1, min(8, int(request.form.get("parallel_limit", "2")))) | |
| except ValueError: | |
| flash("并行数必须是 1 到 8 的整数。", "danger") | |
| return redirect(url_for("admin_dashboard")) | |
| store.set_parallel_limit(parallel_limit) | |
| flash(f"并行数已更新为 {parallel_limit}。", "success") | |
| return redirect(url_for("admin_dashboard")) | |
| def create_user(): | |
| student_id = request.form.get("student_id", "").strip() | |
| password = request.form.get("password", "").strip() | |
| display_name = request.form.get("display_name", "").strip() | |
| if not student_id.isdigit() or not password: | |
| flash("请填写有效的学号和密码。", "danger") | |
| return redirect(url_for("admin_dashboard")) | |
| if store.get_user_by_student_id(student_id): | |
| flash("该学号已经存在。", "warning") | |
| return redirect(url_for("admin_dashboard")) | |
| store.create_user(student_id, secret_box.encrypt(password), display_name) | |
| flash("用户已创建。", "success") | |
| return redirect(url_for("admin_dashboard")) | |
| def update_user(user_id: int): | |
| user = store.get_user(user_id) | |
| if user is None: | |
| flash("用户不存在。", "danger") | |
| return redirect(url_for("admin_dashboard")) | |
| display_name = request.form.get("display_name", user["display_name"]).strip() | |
| password = request.form.get("password", "").strip() | |
| if password: | |
| store.update_user(user_id, display_name=display_name, password_encrypted=secret_box.encrypt(password)) | |
| else: | |
| store.update_user(user_id, display_name=display_name) | |
| flash("用户信息已更新。", "success") | |
| return redirect(url_for("admin_dashboard")) | |
| def toggle_user(user_id: int): | |
| updated = store.toggle_user_active(user_id) | |
| if updated is None: | |
| flash("用户不存在。", "danger") | |
| else: | |
| flash("用户状态已切换。", "success") | |
| return redirect(url_for("admin_dashboard")) | |
| def admin_add_course(user_id: int): | |
| if store.get_user(user_id) is None: | |
| flash("用户不存在。", "danger") | |
| return redirect(url_for("admin_dashboard")) | |
| category = request.form.get("category", "free") | |
| course_id = request.form.get("course_id", "").strip() | |
| course_index = request.form.get("course_index", "").strip() | |
| if not course_id.isdigit() or not course_index.isdigit() or len(course_index) != 2: | |
| flash("课程号必须为数字,课序号必须是 2 位数字。", "danger") | |
| return redirect(url_for("admin_dashboard")) | |
| store.add_course(user_id, category, course_id, course_index) | |
| flash("课程已添加到对应用户。", "success") | |
| return redirect(url_for("admin_dashboard")) | |
| def admin_delete_course(course_target_id: int): | |
| store.delete_course(course_target_id) | |
| flash("课程已删除。", "success") | |
| return redirect(url_for("admin_dashboard")) | |
| def admin_start_user_task(user_id: int): | |
| user = store.get_user(user_id) | |
| if user is None: | |
| flash("用户不存在。", "danger") | |
| return redirect(url_for("admin_dashboard")) | |
| admin_identity = _get_admin_identity() | |
| task, created = _queue_task_for_user(user, requested_by=admin_identity["username"], requested_by_role="admin") | |
| flash("任务已加入队列。" if created else f"该用户已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning") | |
| return redirect(url_for("admin_dashboard")) | |
| def admin_stop_user_task(user_id: int): | |
| active_task = store.find_active_task_for_user(user_id) | |
| if active_task and task_manager.stop_task(active_task["id"]): | |
| flash("已发送停止请求。", "success") | |
| else: | |
| flash("当前没有可停止任务。", "warning") | |
| return redirect(url_for("admin_dashboard")) | |
| def create_admin(): | |
| if not session.get("is_super_admin", False): | |
| flash("只有超级管理员可以新增管理员。", "danger") | |
| return redirect(url_for("admin_dashboard")) | |
| username = request.form.get("username", "").strip() | |
| password = request.form.get("password", "").strip() | |
| if not username or not password: | |
| flash("请填写管理员账号和密码。", "danger") | |
| return redirect(url_for("admin_dashboard")) | |
| if username == config.super_admin_username or store.get_admin_by_username(username): | |
| flash("该管理员账号已存在。", "warning") | |
| return redirect(url_for("admin_dashboard")) | |
| store.create_admin(username, hash_password(password)) | |
| flash("管理员已创建。", "success") | |
| return redirect(url_for("admin_dashboard")) | |
| def user_status(): | |
| user = _get_current_user() | |
| if user is None: | |
| return jsonify({"ok": False}), 401 | |
| task = store.get_latest_task_for_user(user["id"]) | |
| return jsonify({"ok": True, "task": task, "courses": store.list_courses_for_user(user["id"])}) | |
| def admin_status(): | |
| return jsonify( | |
| { | |
| "ok": True, | |
| "stats": store.get_admin_stats(), | |
| "parallel_limit": store.get_parallel_limit(), | |
| "recent_tasks": store.list_recent_tasks(limit=12), | |
| } | |
| ) | |
| def stream_user_logs(): | |
| user = _get_current_user() | |
| if user is None: | |
| return jsonify({"ok": False}), 401 | |
| last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(user_id=user["id"], limit=1)))) | |
| def generate(): | |
| current_last_id = last_id | |
| while True: | |
| logs = store.list_logs_after(current_last_id, user_id=user["id"], limit=60) | |
| if logs: | |
| for log in logs: | |
| current_last_id = int(log["id"]) | |
| yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n" | |
| else: | |
| yield ": keep-alive\n\n" | |
| time.sleep(1) | |
| response = Response(generate(), mimetype="text/event-stream") | |
| response.headers["Cache-Control"] = "no-cache" | |
| response.headers["X-Accel-Buffering"] = "no" | |
| return response | |
| def stream_admin_logs(): | |
| last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(limit=1)))) | |
| def generate(): | |
| current_last_id = last_id | |
| while True: | |
| logs = store.list_logs_after(current_last_id, limit=80) | |
| if logs: | |
| for log in logs: | |
| current_last_id = int(log["id"]) | |
| yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n" | |
| else: | |
| yield ": keep-alive\n\n" | |
| time.sleep(1) | |
| response = Response(generate(), mimetype="text/event-stream") | |
| response.headers["Cache-Control"] = "no-cache" | |
| response.headers["X-Accel-Buffering"] = "no" | |
| return response | |