|
from dotenv import load_dotenv |
|
import os |
|
load_dotenv() |
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
from langchain_text_splitters import Language |
|
from langchain_openai import OpenAIEmbeddings |
|
from langchain_community.vectorstores import Chroma |
|
from langchain_openai import ChatOpenAI |
|
from langchain.chains import RetrievalQA |
|
import chromadb |
|
import gradio as gr |
|
import tqdm |
|
|
|
def read_file(file_path): |
|
with open(file_path, "r", encoding="utf-8") as file: |
|
return file.read() |
|
|
|
|
|
def infer_module_name(file_path): |
|
|
|
path_parts = file_path.split(os.sep) |
|
if "src" in path_parts: |
|
src_index = path_parts.index("src") |
|
return "/".join(path_parts[src_index+1:-1]) |
|
return "root" |
|
|
|
def process_files(root_dir, file_extension, language=None): |
|
if language: |
|
splitter = RecursiveCharacterTextSplitter.from_language( |
|
language=language, chunk_size=1000, chunk_overlap=100 |
|
) |
|
else: |
|
splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=1000, chunk_overlap=100 |
|
) |
|
|
|
all_docs = [] |
|
|
|
for root, _, files in os.walk(root_dir): |
|
for file in files: |
|
if file.endswith(file_extension): |
|
file_path = os.path.join(root, file) |
|
file_name = os.path.basename(file_path) |
|
folder_path = root |
|
module_name = infer_module_name(file_path) |
|
|
|
content = read_file(file_path) |
|
content = f"file name: {file_name}\n path: {module_name}\n {content}" |
|
|
|
docs = splitter.create_documents( |
|
[content], |
|
metadatas=[{ |
|
'source': file_name, |
|
'type': file_extension[1:], |
|
'module': module_name, |
|
'folder_path': folder_path |
|
}] |
|
) |
|
all_docs.extend(docs) |
|
|
|
return all_docs |
|
|
|
def process_all_files(root_directory): |
|
ts_docs = process_files(root_directory, '.ts', Language.TS) |
|
html_docs = process_files(root_directory, '.html', Language.HTML) |
|
txt_docs = process_files(root_directory, '.txt') |
|
md_docs = process_files(root_directory, '.md') |
|
js_docs = process_files(root_directory, '.js', Language.JS) |
|
|
|
all_docs = ts_docs + html_docs + txt_docs + md_docs + js_docs |
|
return all_docs |
|
|
|
def initialize_or_load_database(): |
|
model_name = 'text-embedding-3-large' |
|
embeddings = OpenAIEmbeddings( |
|
model=model_name, |
|
openai_api_key=os.environ.get('OPENAI_API_KEY') |
|
) |
|
|
|
chroma_client = chromadb.PersistentClient(path="./web_app_vector_storage_metadata") |
|
collection_name = "all_files" |
|
|
|
if os.path.exists("collection_storage.txt"): |
|
with open("collection_storage.txt", "r") as f: |
|
collection_storage_name, collection_storage_id = f.read().splitlines() |
|
print("Loading existing vector database...") |
|
docsearch = Chroma( |
|
client=chroma_client, |
|
collection_name=collection_name, |
|
embedding_function=embeddings |
|
) |
|
else: |
|
print("Creating new vector database...") |
|
root_directory = "web-app" |
|
all_documents = process_all_files(root_directory) |
|
print(f"Total number of chunks across all files: {len(all_documents)}") |
|
print("Total number of files: ", len(set([doc.metadata['source'] for doc in all_documents]))) |
|
|
|
docsearch = Chroma.from_documents( |
|
documents=all_documents, |
|
embedding=embeddings, |
|
collection_name=collection_name, |
|
client=chroma_client |
|
) |
|
|
|
collection_storage_name = chroma_client.list_collections()[0].name |
|
collection_storage_id = chroma_client.list_collections()[0].id |
|
|
|
|
|
|
|
with open("collection_storage.txt", "w") as f: |
|
f.write(f"{collection_storage_name}\n{collection_storage_id}") |
|
|
|
return docsearch |
|
|
|
|
|
docsearch = initialize_or_load_database() |
|
|
|
llm = ChatOpenAI( |
|
openai_api_key=os.environ.get('OPENAI_API_KEY'), |
|
model_name='gpt-4o-mini', |
|
temperature=0.3 |
|
) |
|
|
|
qa = RetrievalQA.from_chain_type( |
|
llm=llm, |
|
chain_type="stuff", |
|
retriever=docsearch.as_retriever(), |
|
return_source_documents=True |
|
) |
|
|
|
def get_top_20_embeddings(query): |
|
docs_and_scores = docsearch.similarity_search_with_score(query, k=20) |
|
return docs_and_scores |
|
|
|
def get_top_5_embeddings(query): |
|
if "structure" in query.lower() or "codebase" in query.lower(): |
|
return docsearch.similarity_search_with_score(query, k=10) |
|
return docsearch.similarity_search_with_score(query, k=5) |
|
|
|
def answer_question(question): |
|
top_5_results = get_top_5_embeddings(question) |
|
context = "\n".join([doc.page_content for doc, _ in top_5_results]) |
|
|
|
|
|
query_data = ( |
|
"You are an expert in project structure and various file types including TypeScript, HTML, Markdown, and JS." |
|
"When answering questions, focus on the file organization, key components of the codebase, and the structure of the project." |
|
"For general queries,like hi,hello etc, provide a brief answer, but for questions about project structure, include module names, file paths, and folder organization." |
|
"If you're unsure of the answer, suggest referring to the Mifos Slack Channel." |
|
"\nContext:\n" + context + "\n" + question |
|
) |
|
|
|
response = qa.invoke(query_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return response['result'] |
|
|
|
|
|
interface = gr.Interface( |
|
fn=answer_question, |
|
inputs=gr.Textbox(label="Ask a question about the files"), |
|
outputs=gr.Textbox(label="Answer"), |
|
title="Mifos Web-App Chatbot", |
|
description="Ask questions about TypeScript, HTML files in Mifos Web-App." |
|
) |
|
|
|
if __name__ == "__main__": |
|
interface.launch() |
|
|