Chatbot / app.py
Luthfillah's picture
Update app.py
634cbfc verified
import sys
import os
import time
import requests
import json
import pickle
import streamlit as st
import chromadb
from typing import Any, List, Optional
# --- WINDOWS PATCH ---
if os.name == 'nt':
try:
import resource
except ImportError:
class Resource:
RLIMIT_NOFILE = 0
def getrlimit(self, resource, limits=None): return (2048, 2048)
def setrlimit(self, resource, limits): pass
sys.modules['resource'] = Resource()
# --- LlamaIndex Imports ---
from llama_index.core import VectorStoreIndex, StorageContext, Settings
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.llms import CustomLLM, CompletionResponse, LLMMetadata
from llama_index.core.llms.callbacks import llm_completion_callback
from llama_index.core.prompts import PromptTemplate
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.schema import TextNode, NodeWithScore
from llama_index.core.postprocessor.types import BaseNodePostprocessor
from llama_index.llms.openrouter import OpenRouter
from llama_index.core.base.llms.types import ChatMessage, MessageRole
from llama_index.core.memory import ChatMemoryBuffer
# --- KONFIGURASI PATH & VARIABLE ---
CHROMA_DB_PATH = "vector_db"
COLLECTION_NAME = "chroma_db_with_konteks_8b"
BM25_PICKLE_FILE = "saved_bm25_retriever.pkl"
# --- SECRETS MANAGEMENT ---
required_secrets = ["RUNPOD_API_KEY", "RUNPOD_ENDPOINT_ID", "OPENROUTER_API_KEY", "JINA_API_KEY"]
missing_secrets = [key for key in required_secrets if key not in st.secrets]
if missing_secrets:
st.error(f"Missing Secrets: {', '.join(missing_secrets)}")
st.stop()
RUNPOD_API_KEY = st.secrets["RUNPOD_API_KEY"]
RUNPOD_ENDPOINT_ID = st.secrets["RUNPOD_ENDPOINT_ID"]
RUNPOD_API_URL = f"https://api.runpod.ai/v2/{RUNPOD_ENDPOINT_ID}/run"
RUNPOD_STATUS_URL = f"https://api.runpod.ai/v2/{RUNPOD_ENDPOINT_ID}/status/"
OPENROUTER_API_KEY = st.secrets["OPENROUTER_API_KEY"]
EMBEDDING_MODEL_NAME = "qwen/qwen3-embedding-8b"
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
JINA_API_KEY = st.secrets["JINA_API_KEY"]
JINA_MODEL_NAME = "jina-reranker-v3"
# --- PROMPT TEMPLATES ---
TEXT_QA_TEMPLATE_STR = (
"Anda adalah Asisten Customer Service Universitas Telkom.\n"
"Jawablah pertanyaan pengguna HANYA berdasarkan Dokumen Pendukung yang diberikan di bawah ini.\n"
"Jika jawaban tidak ditemukan di dalam dokumen, katakan bahwa Anda tidak mengetahui informasinya dengan sopan.\n\n"
"Dokumen Pendukung:\n"
"{context_str}\n\n"
"Pertanyaan: {query_str}\n"
"Jawaban:\n"
)
CONDENSE_QUESTION_PROMPT_STR = (
"Tugas: Tulis ulang pertanyaan lanjutan pengguna menjadi kueri pencarian yang berdiri sendiri (standalone) dalam Bahasa Indonesia.\n"
"Berdasarkan Riwayat Percakapan, perjelas konteks atau rujukan pengguna (contoh: ganti kata ganti seperti 'itu', 'dia', 'tersebut' dengan kata benda yang spesifik).\n"
"Jika pertanyaan baru merupakan pergantian topik, tuliskan pertanyaan tersebut apa adanya.\n"
"JANGAN menjawab pertanyaan tersebut. HANYA berikan kueri yang telah ditulis ulang.\n\n"
"Riwayat Percakapan:\n"
"{chat_history}\n\n"
"Pertanyaan Lanjutan: {question}\n\n"
)
# --- CLASS 1: Custom Jina Reranker ---
class JinaRerankCustom(BaseNodePostprocessor):
api_key: str
model: str = "jina-reranker-v3"
top_n: int = 5
def _postprocess_nodes(self, nodes: List[NodeWithScore], query_bundle) -> List[NodeWithScore]:
if not nodes: return []
documents_content = [node.node.get_content() for node in nodes]
url = "https://api.jina.ai/v1/rerank"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
payload = {"model": self.model, "query": query_bundle.query_str, "top_n": self.top_n, "documents": documents_content}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
results = response.json().get('results', [])
new_nodes = []
for result in results:
idx = result['index']
original_node = nodes[idx]
original_node.score = result['relevance_score']
new_nodes.append(original_node)
return new_nodes
except Exception as e:
st.error(f"Reranker Error: {e}")
return nodes[:self.top_n]
# --- CLASS 2: Custom RunPod LLM ---
class RunPodLLM(CustomLLM):
api_key: str = RUNPOD_API_KEY
endpoint_id: str = RUNPOD_ENDPOINT_ID
max_tokens: int = 2048
temperature: float = 0.3
context_window: int = 8192
@property
def metadata(self) -> LLMMetadata:
return LLMMetadata(context_window=self.context_window, num_output=self.max_tokens, model_name="runpod-vllm")
@llm_completion_callback()
def complete(self, prompt: str, **kwargs: Any) -> CompletionResponse:
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
payload = {"input": {"prompt": prompt, "sampling_params": {"temperature": self.temperature, "max_tokens": self.max_tokens, "repetition_penalty": 1.25,"top_p": 0.8}}}
try:
res = requests.post(RUNPOD_API_URL, headers=headers, json=payload)
job_id = res.json().get("id")
status = "IN_QUEUE"
while status in ["IN_QUEUE", "IN_PROGRESS"]:
time.sleep(1)
status_res = requests.get(f"{RUNPOD_STATUS_URL}{job_id}", headers=headers).json()
status = status_res.get("status")
if status == "COMPLETED":
output = status_res.get("output")
# Parsing Logic
if isinstance(output, list): output = output[0]
if isinstance(output, dict) and "choices" in output:
choice = output["choices"][0]
text = "".join(choice.get("tokens", [])) if "tokens" in choice else choice.get("text", "")
return CompletionResponse(text=text)
return CompletionResponse(text=str(output))
return CompletionResponse(text="Job Failed")
except Exception as e:
return CompletionResponse(text=f"Error: {e}")
def stream_complete(self, prompt: str, **kwargs: Any) -> Any: raise NotImplementedError()
# --- CLASS 3: Dual Model Chat Engine (The Logic Wrapper) ---
class DualModelChatEngine:
def __init__(self, query_engine, condense_llm, memory):
self.query_engine = query_engine
self.condense_llm = condense_llm
self.memory = memory
self.condense_prompt_template = PromptTemplate(CONDENSE_QUESTION_PROMPT_STR)
def chat(self, message: str):
# 1. Build History String
history_str = ""
current_history = self.memory.get()
for msg in current_history[-6:]:
role = "User" if msg.role == MessageRole.USER else "Assistant"
history_str += f"{role}: {msg.content}\n"
# 2. Condense Query (using Grok/OpenRouter)
if not history_str:
rewritten_query = message
else:
condense_prompt = self.condense_prompt_template.format(chat_history=history_str, question=message)
rewritten_query = self.condense_llm.complete(condense_prompt).text.strip()
# 3. Query RAG System (Vector + BM25 + Jina + RunPod)
response = self.query_engine.query(rewritten_query)
# 4. Update Memory
self.memory.put(ChatMessage(role=MessageRole.USER, content=message))
self.memory.put(ChatMessage(role=MessageRole.ASSISTANT, content=str(response)))
return response, rewritten_query
# --- SETUP ENGINE ---
@st.cache_resource
def get_chat_engine():
# 1. Models
embed_model = OpenAIEmbedding(model_name=EMBEDDING_MODEL_NAME, api_base=OPENROUTER_BASE_URL, api_key=OPENROUTER_API_KEY)
llm_generate = RunPodLLM()
llm_condense = OpenRouter(api_key=OPENROUTER_API_KEY, model="x-ai/grok-4.1-fast")
Settings.llm = llm_generate
Settings.embed_model = embed_model
# 2. Database
db_client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
chroma_collection = db_client.get_collection(COLLECTION_NAME)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
index = VectorStoreIndex.from_vector_store(vector_store)
# 3. Retrievers
v_retriever = index.as_retriever(similarity_top_k=30)
# BM25 Loader
if os.path.exists(BM25_PICKLE_FILE):
with open(BM25_PICKLE_FILE, 'rb') as f: nodes = pickle.load(f)
else:
all_d = chroma_collection.get()
nodes = [TextNode(text=t, id_=all_d['ids'][i], metadata=all_d['metadatas'][i]) for i, t in enumerate(all_d['documents']) if t]
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes, similarity_top_k=30)
fusion_retriever = QueryFusionRetriever(
[v_retriever, bm25_retriever],
similarity_top_k=30, num_queries=1, mode="reciprocal_rerank", use_async=True
)
# 4. Reranker & Engine
reranker = JinaRerankCustom(api_key=JINA_API_KEY, top_n=5)
query_engine = RetrieverQueryEngine.from_args(
retriever=fusion_retriever,
node_postprocessors=[reranker],
text_qa_template=PromptTemplate(TEXT_QA_TEMPLATE_STR)
)
# 5. Wrap in DualEngine
memory = ChatMemoryBuffer.from_defaults(token_limit=3000)
return DualModelChatEngine(query_engine, llm_condense, memory)
# --- UI STREAMLIT ---
st.set_page_config(page_title="Telkom Univ AI", page_icon="πŸŽ“")
st.title("πŸŽ“ Telkom University AI Assistant")
# [ADDED] Cold Start Information
st.info("ℹ️ **Pemberitahuan:** Request pertama mungkin memakan waktu **1-2 menit (Cold Start)** untuk menyalakan server. Balasan berikutnya akan lebih cepat!", icon="⏳")
try:
chat_engine = get_chat_engine()
except Exception as e:
st.error(f"Init Error: {e}"); st.stop()
# Initialize Session State
if "messages" not in st.session_state:
st.session_state.messages = [{"role": "assistant", "content": "Halo! Ada yang bisa saya bantu terkait info Universitas Telkom dan LAK Fakultas Informatika?"}]
# Render Chat History
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
if msg["role"] == "user":
st.markdown(f"<div style='text-align: right'>{msg['content']}</div>", unsafe_allow_html=True)
else:
st.markdown(msg["content"])
# Handle Input
if prompt := st.chat_input("Tanyakan sesuatu..."):
# 1. Append User Message to History
st.session_state.messages.append({"role": "user", "content": prompt})
# 2. Display User Message Immediately
with st.chat_message("user"):
st.markdown(f"<div style='text-align: right'>{prompt}</div>", unsafe_allow_html=True)
# 3. Generate Assistant Response
with st.chat_message("assistant"):
# Flag untuk mengecek apakah proses berhasil atau error
processing_success = False
# Placeholder untuk menampung teks jawaban agar muncul rapi
response_placeholder = st.empty()
# --- BLOK PROSES (DENGAN SPINNER) ---
with st.spinner("Sedang memproses... (Mohon tunggu jika Cold Start)"):
try:
# Chat Engine Logic
response, rewritten = chat_engine.chat(prompt)
# Tampilkan Jawaban di placeholder
response_placeholder.markdown(str(response))
# Simpan ke history session state
st.session_state.messages.append({"role": "assistant", "content": str(response)})
# Jika sampai baris ini, berarti sukses
processing_success = True
except Exception as e:
st.error(f"Error: {e}")
# --- BLOK COUNTDOWN (DILUAR SPINNER) ---
# Hanya jalankan countdown jika proses di atas SUKSES (tidak error)
if processing_success:
countdown_container = st.empty()
for i in range(60, -1, -1):
# Update pesan warning setiap detik
countdown_container.warning(
f"⚑ **Server akan Sleep dalam **{i} detik**. \n\n",
icon="⚠️"
)
time.sleep(1)
# Pesan ketika waktu habis
countdown_container.error("πŸ’€ **Server telah masuk mode Sleep.** Request berikutnya akan memakan waktu 1-2 menit (Cold Start).", icon="πŸ›‘")