# app.py (HF Spaces CPU-Optimized)
# RAG sekolah super hemat CPU:
# - Default model: 3B instruct (GGUF) + ctx 1024
# - Retrieval cepat: FAISS top-12 → pilih kalimat pakai lexical overlap (tanpa encode per-kalimat)
# - Encoder dipakai HANYA untuk query & FAISS (1x per request)
# - Jawaban final lewat ..., stop di , retry kalau kosong/ellipsis
# - Admin + Auth Postgres tetap sama
import os, json, re, time, logging
from functools import lru_cache, wraps
from typing import Dict, List, Tuple
from dataclasses import dataclass
from datetime import datetime
from zoneinfo import ZoneInfo
from pathlib import Path
from flask import (
Flask, render_template, request, redirect, url_for, session, jsonify, flash
)
import numpy as np
import faiss
import torch
from transformers import AutoTokenizer, AutoModel
from dotenv import load_dotenv
load_dotenv()
# ========= ENV & LOGGING =========
os.environ.setdefault("KMP_DUPLICATE_LIB_OK", "TRUE")
os.environ.setdefault("OMP_NUM_THREADS", "1")
try:
torch.set_num_threads(int(os.environ.get("NUM_THREADS", "3"))) # 3 thread cukup di CPU Spaces
torch.set_num_interop_threads(1)
except Exception:
pass
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
log = logging.getLogger("rag-app")
# ========= IMPORT EKSTERNAL (wrapper & guardrail) =========
from Guardrail import validate_input # -> bool
from Model import load_model, generate # -> llama.cpp wrapper
# ========= PATH ROOT =========
BASE_DIR = Path(__file__).resolve().parent
# ========= KONFIG MODEL & RAG (di-tune untuk CPU) =========
GGUF_DEFAULT = "DeepSeek-R1-Distill-Qwen-7B-Q4_K_M.gguf" # kecil & cepat; upload ke /models
MODEL_PATH = str(BASE_DIR / "models" / os.getenv("GGUF_FILENAME", GGUF_DEFAULT))
CTX_WINDOW = int(os.environ.get("CTX_WINDOW", 1024))
N_GPU_LAYERS = int(os.environ.get("N_GPU_LAYERS", 0))
N_THREADS = int(os.environ.get("NUM_THREADS", 3))
ENCODER_NAME = os.environ.get("ENCODER_NAME", "intfloat/multilingual-e5-large")
ENCODER_DEVICE = torch.device("cpu")
# Dataset sudah ada di Space → path RELATIF (samakan dengan struktur kamu)
SUBJECTS: Dict[str, Dict[str, str]] = {
"ipas": {
"index": str(BASE_DIR / "Rag-Pipeline" / "Vektor Database" / "Ipas" / "IPA_index.index"),
"chunks": str(BASE_DIR / "Dataset" / "Ipas" / "Chunk" / "ipas_chunks.json"),
"embeddings": str(BASE_DIR / "Dataset" / "Ipas" / "Embedd"/ "ipas_embeddings.npy"),
"label": "IPAS",
"desc": "Ilmu Pengetahuan Alam dan Sosial"
},
"penjas": {
"index": str(BASE_DIR / "Rag-Pipeline" / "Vektor Database" / "Penjas" / "PENJAS_index.index"),
"chunks": str(BASE_DIR / "Dataset" / "Penjas" / "Chunk" / "penjas_chunks.json"),
"embeddings": str(BASE_DIR / "Dataset" / "Penjas" / "Embedd" / "penjas_embeddings.npy"),
"label": "PJOK",
"desc": "Pendidikan Jasmani, Olahraga, dan Kesehatan"
},
"pancasila": {
"index": str(BASE_DIR / "Rag-Pipeline" / "Vektor Database" / "Pancasila" / "PANCASILA_index.index"),
"chunks": str(BASE_DIR / "Dataset" / "Pancasila" / "Chunk" / "pancasila_chunks.json"),
"embeddings": str(BASE_DIR / "Dataset" / "Pancasila" / "Embedd" / "pancasila_embeddings.npy"),
"label": "PANCASILA",
"desc": "Pendidikan Pancasila dan Kewarganegaraan"
}
}
# ======= Threshold & parameter cepat (sudah dilonggarkan & adaptif) =======
TOP_K_FAISS = int(os.environ.get("TOP_K_FAISS", 15))
TOP_K_FINAL = int(os.environ.get("TOP_K_FINAL", 10))
MIN_COSINE = float(os.environ.get("MIN_COSINE", 0.83)) # dulu 0.83
MIN_LEXICAL = float(os.environ.get("MIN_LEXICAL", 0.10)) # dulu 0.8 → terlalu ketat utk query pendek
FALLBACK_TEXT = os.environ.get("FALLBACK_TEXT", "maap pengetahuan tidak ada dalam database")
GUARDRAIL_BLOCK_TEXT = os.environ.get("GUARDRAIL_BLOCK_TEXT", "maap, pertanyaan ditolak oleh guardrail")
ENABLE_PROFILING = os.environ.get("ENABLE_PROFILING", "false").lower() == "true"
# ========= APP =========
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "dev-secret-please-change")
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
app.config.update(
SESSION_COOKIE_NAME="session",
SESSION_COOKIE_SAMESITE="None",
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_PATH="/",
PREFERRED_URL_SCHEME="https",
)
# ========= GLOBALS =========
ENCODER_TOKENIZER = None
ENCODER_MODEL = None
LLM = None
@dataclass(frozen=True)
class SubjectAssets:
index: faiss.Index
texts: List[str]
embs: np.ndarray
# ========= TEKS UTIL =========
STOPWORDS_ID = {
"yang","dan","atau","pada","di","ke","dari","itu","ini","adalah","dengan",
"untuk","serta","sebagai","oleh","dalam","akan","kamu","apa","karena",
"agar","sehingga","terhadap","dapat","juga","para","diri",
}
TOKEN_RE = re.compile(r"[A-Za-zÀ-ÖØ-öø-ÿ]+", re.UNICODE)
@lru_cache(maxsize=4096)
def _tok_cached(word: str) -> str:
return word.lower()
def tok_id(text: str) -> List[str]:
return [tw for w in TOKEN_RE.findall(text or "") if (tw := _tok_cached(w)) not in STOPWORDS_ID]
def lexical_overlap(query: str, sent: str) -> float:
q = set(tok_id(query)); s = set(tok_id(sent))
if not q or not s:
return 0.0
return len(q & s) / max(1, len(q | s))
QUESTION_LIKE_RE = re.compile(r"(^\s*(apa|mengapa|bagaimana|sebutkan|jelaskan)\b|[?]$)", re.IGNORECASE)
# Relaksasi filter instruksi: hanya pola yang benar-benar instruksi tugas di awal kalimat
INSTRUCTION_RE = re.compile(r"^\s*(kerjakan|tugas\s*:|diskusikan|latihan\s*:)\b", re.IGNORECASE)
META_PREFIX_PATTERNS = [
r"berdasarkan\s+(?:kalimat|sumber|teks|konten|informasi)(?:\s+(?:di\s+atas|tersebut))?",
r"menurut\s+(?:sumber|teks|konten)",
r"merujuk\s+pada",
r"mengacu\s+pada",
r"bersumber\s+dari",
r"dari\s+(?:kalimat|sumber|teks|konten)"
]
META_PREFIX_RE = re.compile(r"^\s*(?:" + r"|".join(META_PREFIX_PATTERNS) + r")\s*[:\-–—,]?\s*", re.IGNORECASE)
def clean_prefix(t: str) -> str:
t = (t or "").strip()
for _ in range(3):
t2 = META_PREFIX_RE.sub("", t).lstrip()
if t2 == t:
break
t = t2
return t
def strip_meta_sentence(s: str) -> str:
s = clean_prefix(s or "")
if re.match(r"^\s*(berdasarkan|menurut|merujuk|mengacu|bersumber|dari)\b", s, re.IGNORECASE):
s = re.sub(r"^\s*[^,.;!?]*[,.;!?]\s*", "", s) or s
s = clean_prefix(s)
return s.strip()
SENT_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")
def split_sentences_fast(text: str) -> List[str]:
outs = []
for p in SENT_SPLIT_RE.split(text or ""):
s = clean_prefix((p or "").strip())
if not s:
continue
# Opsi: jika dataset kamu sering tanpa tanda akhir, boleh aktifkan ini:
# if s and s[-1] not in ".!?":
# s += "."
if QUESTION_LIKE_RE.search(s):
continue
if INSTRUCTION_RE.search(s):
continue
if len(s) < 12:
continue
outs.append(s)
return outs
# ========= MODEL WARMUP =========
def warmup_models():
global ENCODER_TOKENIZER, ENCODER_MODEL, LLM
if ENCODER_TOKENIZER is None or ENCODER_MODEL is None:
log.info(f"[INIT] Load encoder: {ENCODER_NAME} (CPU)")
ENCODER_TOKENIZER = AutoTokenizer.from_pretrained(ENCODER_NAME)
ENCODER_MODEL = AutoModel.from_pretrained(ENCODER_NAME).to(ENCODER_DEVICE).eval()
if LLM is None:
log.info(f"[INIT] Load LLM: {MODEL_PATH} | ctx={CTX_WINDOW} | threads={N_THREADS}")
LLM = load_model(MODEL_PATH, n_ctx=CTX_WINDOW, n_gpu_layers=N_GPU_LAYERS, n_threads=N_THREADS)
# ========= ASSETS =========
@lru_cache(maxsize=8)
def load_subject_assets(subject_key: str) -> "SubjectAssets":
if subject_key not in SUBJECTS:
raise ValueError(f"Unknown subject: {subject_key}")
cfg = SUBJECTS[subject_key]
log.info(f"[ASSETS] Loading subject={subject_key} | index={cfg['index']}")
if not os.path.exists(cfg["index"]):
raise FileNotFoundError(cfg["index"])
if not os.path.exists(cfg["chunks"]):
raise FileNotFoundError(cfg["chunks"])
if not os.path.exists(cfg["embeddings"]):
raise FileNotFoundError(cfg["embeddings"])
index = faiss.read_index(cfg["index"])
with open(cfg["chunks"], "r", encoding="utf-8") as f:
texts = [it.get("text", "") for it in json.load(f)]
embs = np.load(cfg["embeddings"]) # (N, dim)
if index.ntotal != len(embs):
raise RuntimeError(f"Mismatch ntotal({index.ntotal}) vs emb({len(embs)})")
return SubjectAssets(index=index, texts=texts, embs=embs)
# ========= ENCODER =========
@torch.inference_mode()
@lru_cache(maxsize=1024)
def encode_query_exact(text: str) -> np.ndarray:
toks = ENCODER_TOKENIZER(text, padding=True, truncation=True, return_tensors="pt").to(ENCODER_DEVICE)
out = ENCODER_MODEL(**toks)
vec = out.last_hidden_state.mean(dim=1)
return vec.cpu().numpy()
def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
a = np.asarray(a).reshape(-1); b = np.asarray(b).reshape(-1)
denom = (np.linalg.norm(a) * np.linalg.norm(b)) + 1e-12
return float(np.dot(a, b) / denom)
# ========= RETRIEVAL CEPAT =========
def best_cosine_from_faiss(query: str, subject_key: str) -> float:
assets = load_subject_assets(subject_key)
q = encode_query_exact(query)
_, I = assets.index.search(q, TOP_K_FAISS)
qv = q.reshape(-1)
best = -1.0
for i in I[0]:
if 0 <= i < len(assets.texts):
best = max(best, cosine_sim(qv, assets.embs[i]))
return best
def retrieve_top_chunks(query: str, subject_key: str) -> List[str]:
assets = load_subject_assets(subject_key)
q = encode_query_exact(query)
_, idx = assets.index.search(q, TOP_K_FAISS)
idxs = [i for i in idx[0] if 0 <= i < len(assets.texts)]
return [assets.texts[i] for i in idxs[:TOP_K_FINAL]]
# ======= Seleksi kalimat dua-fase (ketat → longgar) =======
def pick_best_sentences_fast(query: str, chunks: List[str], top_k: int = 4) -> List[str]:
"""
Fase-1: ambil kalimat dg overlap >= MIN_LEXICAL
Fase-2 (fallback): kalau hasil < top_k, ambil kalimat skor tertinggi meski < MIN_LEXICAL
"""
cands: List[Tuple[float, str]] = []
for ch in chunks:
for s in split_sentences_fast(ch):
ovl = lexical_overlap(query, s)
L = len(s)
len_bonus = 0.05 if 50 <= L <= 220 else 0.0
score = ovl + len_bonus
cands.append((score, clean_prefix(s)))
if not cands:
log.info("[RAG] Tidak ada kandidat kalimat (split_sentences menghasilkan 0).")
return []
cands.sort(key=lambda x: x[0], reverse=True)
strict = [s for sc, s in cands if sc + 1e-6 >= MIN_LEXICAL]
if len(strict) >= top_k:
return strict[:top_k]
log.info(f"[RAG] Kalimat relevan < {top_k} pada MIN_LEXICAL={MIN_LEXICAL}; fallback longgar dipakai.")
return [s for _, s in cands[:top_k]]
# ========= PROMPT =========
def build_prompt(user_query: str, sentences: List[str]) -> str:
block = "\n".join(f"- {clean_prefix(s)}" for s in sentences)
system = (
"Kamu asisten RAG.\n"
f"- Jika tidak ada kalimat yang relevan, tulis persis: {FALLBACK_TEXT}\n"
"- Jawab TEPAT 1 kalimat, ringkas, Bahasa Indonesia baku (≥ 6 kata).\n"
"- Tanpa frasa meta (berdasarkan/menurut/merujuk/mengacu/bersumber).\n"
"- Tulis jawaban final di dalam tag Jawaban. dan jangan menulis apa pun setelah ."
)
fewshot = (
"Contoh format: \n"
"KALIMAT SUMBER:\n- Air memuai saat dipanaskan.\n"
"PERTANYAAN: Apa yang terjadi pada air saat dipanaskan?\n"
"Air akan memuai ketika dipanaskan.\n"
)
return (
f"{system}\n\n{fewshot}\n"
f"KALIMAT SUMBER:\n{block}\n\n"
f"PERTANYAAN: {user_query}\n"
f"TULIS JAWABAN DI DALAM ... SAJA:"
)
@lru_cache(maxsize=1024)
def validate_input_cached(q: str) -> bool:
try:
return validate_input(q)
except Exception as e:
log.exception(f"[GUARDRAIL] error: {e}")
return False
# ========= AUTH (POSTGRES) =========
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy import create_engine, Column, Integer, String, Text, Boolean, func, or_
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base, Session
POSTGRES_URL = os.environ.get("POSTGRES_URL")
if not POSTGRES_URL:
raise RuntimeError("POSTGRES_URL tidak ditemukan. Set di Settings → Variables.")
engine = create_engine(POSTGRES_URL, pool_pre_ping=True, future=True, echo=False)
SessionLocal = scoped_session(sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True))
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(120), unique=True, nullable=False, index=True)
password = Column(Text, nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
is_admin = Column(Boolean, default=False, nullable=False)
class ChatHistory(Base):
__tablename__ = "chat_history"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False, index=True)
subject_key = Column(String(50), nullable=False, index=True)
role = Column(String(10), nullable=False)
message = Column(Text, nullable=False)
timestamp = Column(Integer, server_default=func.extract("epoch", func.now()))
Base.metadata.create_all(bind=engine)
JKT_TZ = ZoneInfo("Asia/Jakarta")
@app.template_filter("fmt_ts")
def fmt_ts(epoch_int: int):
try:
dt = datetime.fromtimestamp(int(epoch_int), tz=JKT_TZ)
return dt.strftime("%d %b %Y %H:%M")
except Exception:
return "-"
def db():
return SessionLocal()
def login_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("auth_login"))
return view_func(*args, **kwargs)
return wrapper
def admin_required(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("auth_login"))
if not session.get("is_admin"):
flash("Hanya admin yang boleh mengakses halaman itu.", "error")
return redirect(url_for("subjects"))
return view_func(*args, **kwargs)
return wrapper
# ========= ROUTES =========
@app.route("/")
def root():
return redirect(url_for("auth_login"))
@app.route("/auth/login", methods=["GET", "POST"])
def auth_login():
if request.method == "POST":
identity = (
request.form.get("identity") or request.form.get("email") or request.form.get("username") or ""
).strip().lower()
pw_input = (request.form.get("password") or "").strip()
if not identity or not pw_input:
flash("Mohon isi email/username dan password.", "error")
return render_template("login.html"), 400
s = db()
try:
user = (
s.query(User)
.filter(or_(func.lower(User.username) == identity, func.lower(User.email) == identity))
.first()
)
log.info(f"[LOGIN] identity='{identity}' found={bool(user)} active={getattr(user,'is_active',None)}")
ok = bool(user and user.is_active and check_password_hash(user.password, pw_input))
finally:
s.close()
if not ok:
flash("Identitas atau password salah.", "error")
return render_template("login.html"), 401
session["logged_in"] = True
session["user_id"] = user.id
session["username"] = user.username
session["is_admin"] = bool(user.is_admin)
log.info(f"[LOGIN] OK user_id={user.id}; session set.")
return redirect(url_for("subjects"))
return render_template("login.html")
@app.route("/whoami")
def whoami():
return {
"logged_in": bool(session.get("logged_in")),
"user_id": session.get("user_id"),
"username": session.get("username"),
"is_admin": session.get("is_admin"),
}
@app.route("/auth/register", methods=["GET", "POST"])
def auth_register():
if request.method == "POST":
username = (request.form.get("username") or "").strip().lower()
email = (request.form.get("email") or "").strip().lower()
pw = (request.form.get("password") or "").strip()
confirm = (request.form.get("confirm") or "").strip()
if not username or not email or not pw:
flash("Semua field wajib diisi.", "error")
return render_template("register.html"), 400
if len(pw) < 6:
flash("Password minimal 6 karakter.", "error")
return render_template("register.html"), 400
if pw != confirm:
flash("Konfirmasi password tidak cocok.", "error")
return render_template("register.html"), 400
s = db()
try:
existed = (
s.query(User)
.filter(or_(func.lower(User.username) == username, func.lower(User.email) == email))
.first()
)
if existed:
flash("Username/Email sudah terpakai.", "error")
return render_template("register.html"), 409
u = User(username=username, email=email, password=generate_password_hash(pw), is_active=True)
s.add(u); s.commit()
finally:
s.close()
flash("Registrasi berhasil. Silakan login.", "success")
return redirect(url_for("auth_login"))
return render_template("register.html")
@app.route("/auth/logout")
def auth_logout():
session.clear()
return redirect(url_for("auth_login"))
@app.route("/about")
def about():
return render_template("about.html")
@app.route("/subjects")
@login_required
def subjects():
log.info(f"[SESSION DEBUG] logged_in={session.get('logged_in')} user_id={session.get('user_id')}")
return render_template("home.html", subjects=SUBJECTS)
@app.route("/chat/")
@login_required
def chat_subject(subject_key: str):
if subject_key not in SUBJECTS:
return redirect(url_for("subjects"))
session["subject_selected"] = subject_key
label = SUBJECTS[subject_key]["label"]
s = db()
try:
uid = session.get("user_id")
rows = (
s.query(ChatHistory)
.filter_by(user_id=uid, subject_key=subject_key)
.order_by(ChatHistory.id.asc())
.all()
)
history = [{"role": r.role, "message": r.message} for r in rows]
finally:
s.close()
return render_template("chat.html", subject=subject_key, subject_label=label, history=history)
@app.route("/health")
def health():
return jsonify({
"ok": True,
"encoder_loaded": ENCODER_MODEL is not None,
"llm_loaded": LLM is not None,
"model_path": MODEL_PATH,
"ctx_window": CTX_WINDOW,
"threads": N_THREADS,
})
@app.route("/ask/", methods=["POST"])
@login_required
def ask(subject_key: str):
if subject_key not in SUBJECTS:
return jsonify({"ok": False, "error": "invalid subject"}), 400
warmup_models()
t0 = time.perf_counter()
data = request.get_json(silent=True) or {}
query = (data.get("message") or "").strip()
if not query:
return jsonify({"ok": False, "error": "empty query"}), 400
if not validate_input_cached(query):
return jsonify({"ok": True, "answer": GUARDRAIL_BLOCK_TEXT})
try:
_ = load_subject_assets(subject_key)
except Exception as e:
log.exception(f"[ASSETS] error: {e}")
return jsonify({"ok": False, "error": f"subject assets error: {e}"}), 500
best = best_cosine_from_faiss(query, subject_key)
log.info(f"[RAG] Subject={subject_key.upper()} | Best cosine={best:.3f}")
if best < MIN_COSINE:
log.info(f"[RAG] Fallback by cosine: {best:.3f} < {MIN_COSINE}")
return jsonify({"ok": True, "answer": FALLBACK_TEXT})
chunks = retrieve_top_chunks(query, subject_key)
if not chunks:
log.info("[RAG] Fallback by chunks=0")
return jsonify({"ok": True, "answer": FALLBACK_TEXT})
sentences = pick_best_sentences_fast(query, chunks, top_k=5)
log.info(f"[RAG] sentences_selected={len(sentences)} (min_lex={MIN_LEXICAL}, top_k={5})")
if not sentences:
log.info("[RAG] Fallback by sentences=0")
return jsonify({"ok": True, "answer": FALLBACK_TEXT})
prompt = build_prompt(query, sentences)
try:
# PASS-1: deterministik & singkat
raw_answer = generate(
LLM,
prompt,
max_tokens=int(os.environ.get("MAX_TOKENS", 64)),
temperature=float(os.environ.get("TEMP", 0.2)),
top_p=1.0,
stop=[""]
) or ""
raw_answer = raw_answer.strip()
log.info(f"[LLM] Raw answer repr (pass1): {repr(raw_answer)}")
# Bersihkan tag dan ambil isi
text = re.sub(r"]*>.*?", "", raw_answer, flags=re.DOTALL | re.IGNORECASE).strip()
text = re.sub(r"?think\b[^>]*>", "", text, flags=re.IGNORECASE).strip()
m_final = re.search(r"\s*(.+)$", text, flags=re.IGNORECASE | re.DOTALL)
cleaned = (m_final.group(1).strip() if m_final else re.sub(r"<[^>]+>", "", text).strip())
def _alpha_tokens(s: str) -> List[str]:
return re.findall(r"[A-Za-zÀ-ÖØ-öø-ÿ]+", s or "")
def _is_bad(s: str) -> bool:
s2 = (s or "").strip()
if not s2:
return True
if s2 in {"...", ".", "..", "…"}:
return True
toks = _alpha_tokens(s2)
if len(toks) >= 4:
return False
if any(t.lower() in {"newton","n","kg","m","s"} for t in toks) and len(toks) >= 3:
return False
return True
if _is_bad(cleaned):
prompt_retry = (
prompt
+ "\n\nULANGI DENGAN TAAT FORMAT: "
"Tulis satu kalimat faktual tanpa placeholder/ellipsis, "
"mulai huruf kapital dan akhiri titik. "
"Tulis hanya di dalam ...."
)
raw_answer2 = generate(
LLM,
prompt_retry,
max_tokens=int(os.environ.get("MAX_TOKENS", 64)),
temperature=0.2,
top_p=1.0,
stop=[""]
) or ""
raw_answer2 = raw_answer2.strip()
log.info(f"[LLM] Raw answer repr (pass2): {repr(raw_answer2)}")
text2 = re.sub(r"]*>.*?", "", raw_answer2, flags=re.DOTALL | re.IGNORECASE).strip()
text2 = re.sub(r"?think\b[^>]*>", "", text2, flags=re.IGNORECASE).strip()
m_final2 = re.search(r"\s*(.+)$", text2, flags=re.IGNORECASE | re.DOTALL)
cleaned2 = (m_final2.group(1).strip() if m_final2 else re.sub(r"<[^>]+>", "", text2).strip())
cleaned = cleaned2 or cleaned
answer = cleaned
except Exception as e:
log.exception(f"[LLM] generate error: {e}")
return jsonify({"ok": True, "answer": FALLBACK_TEXT})
# Ambil 1 kalimat pertama saja
m = re.search(r"(.+?[.!?])(\s|$)", answer)
answer = (m.group(1) if m else answer).strip()
answer = strip_meta_sentence(answer)
# Simpan history
try:
s = db()
uid = session.get("user_id")
s.add_all([
ChatHistory(user_id=uid, subject_key=subject_key, role="user", message=query),
ChatHistory(user_id=uid, subject_key=subject_key, role="bot", message=answer),
])
s.commit()
except Exception as e:
log.exception(f"[DB] gagal simpan chat history: {e}")
finally:
try:
s.close()
except Exception:
pass
if not answer or len(answer) < 2:
answer = FALLBACK_TEXT
if ENABLE_PROFILING:
log.info({
"latency_total": time.perf_counter() - t0,
"subject": subject_key,
"faiss_best": best,
})
return jsonify({"ok": True, "answer": answer})
# ===== Admin =====
@app.route("/admin")
@admin_required
def admin_dashboard():
s = db()
try:
total_users = s.query(func.count(User.id)).scalar() or 0
total_active = s.query(func.count(User.id)).filter(User.is_active.is_(True)).scalar() or 0
total_admins = s.query(func.count(User.id)).filter(User.is_admin.is_(True)).scalar() or 0
total_msgs = s.query(func.count(ChatHistory.id)).scalar() or 0
finally:
s.close()
return render_template("admin_dashboard.html", total_users=total_users, total_active=total_active, total_admins=total_admins, total_msgs=total_msgs)
@app.route("/admin/users")
@admin_required
def admin_users():
q = (request.args.get("q") or "").strip().lower()
page = max(int(request.args.get("page", 1)), 1)
per_page = min(max(int(request.args.get("per_page", 20)), 5), 100)
s = db()
try:
base = s.query(User)
if q:
base = base.filter(or_(func.lower(User.username).like(f"%{q}%"), func.lower(User.email).like(f"%{q}%")))
total = base.count()
users = base.order_by(User.id.asc()).offset((page - 1) * per_page).limit(per_page).all()
user_ids = [u.id for u in users] or [-1]
counts = dict(s.query(ChatHistory.user_id, func.count(ChatHistory.id)).filter(ChatHistory.user_id.in_(user_ids)).group_by(ChatHistory.user_id).all())
finally:
s.close()
return render_template("admin_users.html", users=users, counts=counts, q=q, page=page, per_page=per_page, total=total)
@app.route("/admin/history")
@admin_required
def admin_history():
q = (request.args.get("q") or "").strip().lower()
username = (request.args.get("username") or "").strip().lower()
subject = (request.args.get("subject") or "").strip().lower()
role = (request.args.get("role") or "").strip().lower()
page = max(int(request.args.get("page", 1)), 1)
per_page = min(max(int(request.args.get("per_page", 30)), 5), 200)
s = db()
try:
base = (s.query(ChatHistory, User).join(User, User.id == ChatHistory.user_id))
if q:
base = base.filter(func.lower(ChatHistory.message).like(f"%{q}%"))
if username:
base = base.filter(or_(func.lower(User.username) == username, func.lower(User.email) == username))
if subject:
base = base.filter(func.lower(ChatHistory.subject_key) == subject)
if role in ("user", "bot"):
base = base.filter(ChatHistory.role == role)
total = base.count()
rows = base.order_by(ChatHistory.id.desc()).offset((page - 1) * per_page).limit(per_page).all()
finally:
s.close()
items = [{
"id": r.ChatHistory.id,
"username": r.User.username,
"email": r.User.email,
"subject": r.ChatHistory.subject_key,
"role": r.ChatHistory.role,
"message": r.ChatHistory.message,
"timestamp": r.ChatHistory.timestamp,
} for r in rows]
return render_template("admin_history.html", items=items, subjects=SUBJECTS, q=q, username=username, subject=subject, role=role, page=page, per_page=per_page, total=total)
def _is_last_admin(s: Session) -> bool:
return (s.query(func.count(User.id)).filter(User.is_admin.is_(True)).scalar() or 0) <= 1
@app.route("/admin/users//delete", methods=["POST"])
@admin_required
def admin_delete_user(user_id: int):
s = db()
try:
me_id = session.get("user_id")
user = s.query(User).filter_by(id=user_id).first()
if not user:
flash("User tidak ditemukan.", "error")
return redirect(request.referrer or url_for("admin_users"))
if user.id == me_id:
flash("Tidak bisa menghapus akun yang sedang login.", "error")
return redirect(request.referrer or url_for("admin_users"))
if user.is_admin and _is_last_admin(s):
flash("Tidak bisa menghapus admin terakhir.", "error")
return redirect(request.referrer or url_for("admin_users"))
s.query(ChatHistory).filter(ChatHistory.user_id == user.id).delete(synchronize_session=False)
s.delete(user); s.commit()
flash(f"User #{user_id} beserta seluruh riwayatnya telah dihapus.", "success")
except Exception as e:
s.rollback(); log.exception(f"[ADMIN] delete user error: {e}")
flash("Gagal menghapus user.", "error")
finally:
s.close()
return redirect(request.referrer or url_for("admin_users"))
@app.route("/admin/users//history/clear", methods=["POST"])
@admin_required
def admin_clear_user_history(user_id: int):
s = db()
try:
exists = s.query(User.id).filter_by(id=user_id).first()
if not exists:
flash("User tidak ditemukan.", "error")
return redirect(request.referrer or url_for("admin_history"))
deleted = s.query(ChatHistory).filter(ChatHistory.user_id == user_id).delete(synchronize_session=False)
s.commit()
flash(f"Riwayat chat user #{user_id} dihapus ({deleted} baris).", "success")
except Exception as e:
s.rollback(); log.exception(f"[ADMIN] clear history error: {e}")
flash("Gagal menghapus riwayat.", "error")
finally:
s.close()
return redirect(request.referrer or url_for("admin_history"))
@app.route("/admin/history//delete", methods=["POST"])
@admin_required
def admin_delete_chat(chat_id: int):
s = db()
try:
row = s.query(ChatHistory).filter_by(id=chat_id).first()
if not row:
flash("Baris riwayat tidak ditemukan.", "error")
return redirect(request.referrer or url_for("admin_history"))
s.delete(row); s.commit()
flash(f"Riwayat chat #{chat_id} dihapus.", "success")
except Exception as e:
s.rollback(); log.exception(f"[ADMIN] delete chat error: {e}")
flash("Gagal menghapus riwayat.", "error")
finally:
s.close()
return redirect(request.referrer or url_for("admin_history"))
# ========= ENTRY =========
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port, debug=False)