nkcong206's picture
update
ee39452
raw
history blame
13.9 kB
import streamlit as st
import os
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.document_loaders import TextLoader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.prompts import PromptTemplate
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import umap
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from sklearn.mixture import GaussianMixture
from langchain_core.runnables import RunnablePassthrough
from langchain_chroma import Chroma
def global_cluster_embeddings(
embeddings: np.ndarray,
dim: int,
n_neighbors: Optional[int] = None,
metric: str = "cosine",
) -> np.ndarray:
if n_neighbors is None:
n_neighbors = int((len(embeddings) - 1) ** 0.5)
return umap.UMAP(
n_neighbors=n_neighbors, n_components=dim, metric=metric
).fit_transform(embeddings)
def local_cluster_embeddings(
embeddings: np.ndarray, dim: int, num_neighbors: int = 10, metric: str = "cosine"
) -> np.ndarray:
return umap.UMAP(
n_neighbors=num_neighbors, n_components=dim, metric=metric
).fit_transform(embeddings)
def get_optimal_clusters(
embeddings: np.ndarray, max_clusters: int = 50, random_state: int = 200
) -> int:
max_clusters = min(max_clusters, len(embeddings))
n_clusters = np.arange(1, max_clusters)
bics = []
for n in n_clusters:
gm = GaussianMixture(n_components=n, random_state=random_state)
gm.fit(embeddings)
bics.append(gm.bic(embeddings))
return n_clusters[np.argmin(bics)]
def GMM_cluster(embeddings: np.ndarray, threshold: float, random_state: int = 0):
n_clusters = get_optimal_clusters(embeddings, random_state = 200)
gm = GaussianMixture(n_components=n_clusters, random_state=random_state)
gm.fit(embeddings)
probs = gm.predict_proba(embeddings)
labels = [np.where(prob > threshold)[0] for prob in probs]
return labels, n_clusters
def perform_clustering(
embeddings: np.ndarray,
dim: int,
threshold: float,
) -> List[np.ndarray]:
if len(embeddings) <= dim + 1:
# Avoid clustering when there's insufficient data
return [np.array([0]) for _ in range(len(embeddings))]
# Global dimensionality reduction
reduced_embeddings_global = global_cluster_embeddings(embeddings, dim)
# Global clustering
global_clusters, n_global_clusters = GMM_cluster(
reduced_embeddings_global, threshold
)
all_local_clusters = [np.array([]) for _ in range(len(embeddings))]
total_clusters = 0
# Iterate through each global cluster to perform local clustering
for i in range(n_global_clusters):
# Extract embeddings belonging to the current global cluster
global_cluster_embeddings_ = embeddings[
np.array([i in gc for gc in global_clusters])
]
if len(global_cluster_embeddings_) == 0:
continue
if len(global_cluster_embeddings_) <= dim + 1:
# Handle small clusters with direct assignment
local_clusters = [np.array([0]) for _ in global_cluster_embeddings_]
n_local_clusters = 1
else:
# Local dimensionality reduction and clustering
reduced_embeddings_local = local_cluster_embeddings(
global_cluster_embeddings_, dim
)
local_clusters, n_local_clusters = GMM_cluster(
reduced_embeddings_local, threshold
)
# Assign local cluster IDs, adjusting for total clusters already processed
for j in range(n_local_clusters):
local_cluster_embeddings_ = global_cluster_embeddings_[
np.array([j in lc for lc in local_clusters])
]
indices = np.where(
(embeddings == local_cluster_embeddings_[:, None]).all(-1)
)[1]
for idx in indices:
all_local_clusters[idx] = np.append(
all_local_clusters[idx], j + total_clusters
)
total_clusters += n_local_clusters
return all_local_clusters
def embed(embd,texts):
text_embeddings = embd.embed_documents(texts)
text_embeddings_np = np.array(text_embeddings)
return text_embeddings_np
def embed_cluster_texts(embd,texts):
text_embeddings_np = embed(embd,texts) # Generate embeddings
cluster_labels = perform_clustering(
text_embeddings_np, 10, 0.1
) # Perform clustering on the embeddings
df = pd.DataFrame() # Initialize a DataFrame to store the results
df["text"] = texts # Store original texts
df["embd"] = list(text_embeddings_np) # Store embeddings as a list in the DataFrame
df["cluster"] = cluster_labels # Store cluster labels
return df
def fmt_txt(df: pd.DataFrame) -> str:
unique_txt = df["text"].tolist()
return "--- --- \n --- --- ".join(unique_txt)
def embed_cluster_summarize_texts(model,embd,
texts: List[str], level: int
) -> Tuple[pd.DataFrame, pd.DataFrame]:
df_clusters = embed_cluster_texts(embd,texts)
# Prepare to expand the DataFrame for easier manipulation of clusters
expanded_list = []
# Expand DataFrame entries to document-cluster pairings for straightforward processing
for index, row in df_clusters.iterrows():
for cluster in row["cluster"]:
expanded_list.append(
{"text": row["text"], "embd": row["embd"], "cluster": cluster}
)
# Create a new DataFrame from the expanded list
expanded_df = pd.DataFrame(expanded_list)
# Retrieve unique cluster identifiers for processing
all_clusters = expanded_df["cluster"].unique()
# Summarization
template = """Bạn là một chatbot hỗ trợ tuyển sinh và sinh viên đại học, hãy tóm tắt chi tiết tài liệu quy chế dưới đây.
Đảm bảo rằng nội dung tóm tắt giúp người dùng hiểu rõ các quy định và quy trình liên quan đến tuyển sinh hoặc đào tạo tại đại học.
Tài liệu:
{context}
"""
prompt = ChatPromptTemplate.from_template(template)
chain = prompt | model | StrOutputParser()
summaries = []
for i in all_clusters:
df_cluster = expanded_df[expanded_df["cluster"] == i]
formatted_txt = fmt_txt(df_cluster)
summaries.append(chain.invoke({"context": formatted_txt}))
df_summary = pd.DataFrame(
{
"summaries": summaries,
"level": [level] * len(summaries),
"cluster": list(all_clusters),
}
)
return df_clusters, df_summary
def recursive_embed_cluster_summarize(model,embd,
texts: List[str], level: int = 1, n_levels: int = 3
) -> Dict[int, Tuple[pd.DataFrame, pd.DataFrame]]:
results = {}
df_clusters, df_summary = embed_cluster_summarize_texts(model,embd,texts, level)
results[level] = (df_clusters, df_summary)
unique_clusters = df_summary["cluster"].nunique()
if level < n_levels and unique_clusters > 1:
new_texts = df_summary["summaries"].tolist()
next_level_results = recursive_embed_cluster_summarize(model,embd,
new_texts, level + 1, n_levels
)
results.update(next_level_results)
return results
page = st.title("Chat with AskUSTH")
if "gemini_api" not in st.session_state:
st.session_state.gemini_api = None
if "rag" not in st.session_state:
st.session_state.rag = None
if "llm" not in st.session_state:
st.session_state.llm = None
@st.cache_resource
def get_chat_google_model(api_key):
os.environ["GOOGLE_API_KEY"] = api_key
return ChatGoogleGenerativeAI(
model="gemini-1.5-flash",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
)
@st.cache_resource
def get_embedding_model():
model_name = "bkai-foundation-models/vietnamese-bi-encoder"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': False}
model = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs
)
return model
if "embd" not in st.session_state:
st.session_state.embd = get_embedding_model()
if "save_dir" not in st.session_state:
st.session_state.save_dir = None
if "uploaded_files" not in st.session_state:
st.session_state.uploaded_files = set()
@st.dialog("Setup Gemini")
def vote():
st.markdown(
"""
Để sử dụng Google Gemini, bạn cần cung cấp API key. Tạo key của bạn [tại đây](https://ai.google.dev/gemini-api/docs/get-started/tutorial?lang=python&hl=vi) và dán vào bên dưới.
"""
)
key = st.text_input("Key:", "")
if st.button("Save"):
st.session_state.gemini_api = key
st.rerun()
if st.session_state.gemini_api is None:
vote()
else:
os.environ["GOOGLE_API_KEY"] = st.session_state.gemini_api
st.session_state.model = get_chat_google_model(st.session_state.gemini_api)
st.write(f"Key is set to: {st.session_state.gemini_api}")
st.write(f"loaded vietnamese-bi-encoder")
if st.session_state.save_dir is None:
save_dir = "./Documents"
if not os.path.exists(save_dir):
os.makedirs(save_dir)
st.session_state.save_dir = save_dir
def load_txt(file_path):
loader_sv = TextLoader(file_path=file_path, encoding="utf-8")
doc = loader_sv.load()
return doc
with st.sidebar:
uploaded_files = st.file_uploader("Chọn file CSV", accept_multiple_files=True, type=["txt"])
if st.session_state.gemini_api:
if uploaded_files:
documents = []
uploaded_file_names = set()
new_docs = False
for uploaded_file in uploaded_files:
uploaded_file_names.add(uploaded_file.name)
if uploaded_file.name not in st.session_state.uploaded_files:
file_path = os.path.join(st.session_state.save_dir, uploaded_file.name)
with open(file_path, mode='wb') as w:
w.write(uploaded_file.getvalue())
else:
continue
new_docs = True
doc = load_txt(file_path)
documents.extend([*doc])
if new_docs:
st.session_state.uploaded_files = uploaded_file_names
st.session_state.rag = None
else:
st.session_state.uploaded_files = set()
st.session_state.rag = None
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
@st.cache_resource
def compute_rag_chain(model, embd, docs_texts):
results = recursive_embed_cluster_summarize(model, embd, docs_texts, level=1, n_levels=3)
all_texts = docs_texts.copy()
for level in sorted(results.keys()):
summaries = results[level][1]["summaries"].tolist()
all_texts.extend(summaries)
vectorstore = Chroma.from_texts(texts=all_texts, embedding=embd)
retriever = vectorstore.as_retriever()
template = """
Bạn là một trợ lí AI hỗ trợ tuyển sinh và sinh viên. \n
Hãy trả lời câu hỏi chính xác, tập trung vào thông tin liên quan đến câu hỏi. \n
Nếu bạn không biết câu trả lời, hãy nói không biết, đừng cố tạo ra câu trả lời.\n
Dưới đây là thông tin liên quan mà bạn cần sử dụng tới:\n
{context}\n
hãy trả lời:\n
{question}
"""
prompt = PromptTemplate(template=template, input_variables=["context", "question"])
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
return rag_chain
if st.session_state.uploaded_files:
if st.session_state.gemini_api is not None:
with st.spinner("Đang xử lý, vui lòng đợi..."):
if st.session_state.rag is None:
docs_texts = [d.page_content for d in documents]
st.session_state.rag = compute_rag_chain(st.session_state.model, st.session_state.embd, docs_texts)
if st.session_state.gemini_api is not None:
if st.session_state.llm is None:
mess = ChatPromptTemplate.from_messages(
[
(
"system",
"Bản là một trợ lí AI hỗ trợ tuyển sinh và sinh viên",
),
("human", "{input}"),
]
)
chain = mess | st.session_state.model
st.session_state.llm = chain
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
for message in st.session_state.chat_history:
with st.chat_message(message["role"]):
st.write(message["content"])
prompt = st.chat_input("Bạn muốn hỏi gì?")
if st.session_state.gemini_api:
if prompt:
st.session_state.chat_history.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.write(prompt)
with st.chat_message("assistant"):
if st.session_state.rag is not None:
respone = st.session_state.rag.invoke(prompt)
st.write(respone)
else:
ans = st.session_state.llm.invoke(prompt)
respone = ans.content
st.write(respone)
st.session_state.chat_history.append({"role": "assistant", "content": respone})