bupa1018's picture
Update app.py
fbe4134
raw
history blame
15.2 kB
import os
import json
import gradio as gr
import zipfile
import tempfile
import requests
import urllib.parse
import io
from langchain_community.vectorstores import Chroma
from huggingface_hub import HfApi, login
from PyPDF2 import PdfReader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_groq import ChatGroq
from dotenv import load_dotenv
from langchain.docstore.document import Document
from langchain.schema import Document
from chunk_python_code import chunk_python_code_with_metadata
from vectorstore import get_chroma_vectorstore
# Load environment variables from .env file
load_dotenv()
# Load configuration from JSON file
with open('config.json') as config_file:
config = json.load(config_file)
with open("config2.json", "r") as file:
config2 = json.load(file)
PERSIST_DOC_DIRECTORY = config["persist_doc_directory"]
PERSIST_CODE_DIRECTORY =config["persist_code_directory"]
CHUNK_SIZE = config["chunk_size"]
CHUNK_OVERLAP = config["chunk_overlap"]
EMBEDDING_MODEL_NAME = config["embedding_model"]
LLM_MODEL_NAME = config["llm_model"]
LLM_TEMPERATURE = config["llm_temperature"]
GITLAB_API_URL = config["gitlab_api_url"]
HF_SPACE_NAME = config["hf_space_name"]
DATA_DIR = config["data_dir"]
GROQ_API_KEY = os.environ["GROQ_API_KEY"]
HF_TOKEN = os.environ["HF_Token"]
login(HF_TOKEN)
api = HfApi()
def load_project_id(json_file):
with open(json_file, 'r') as f:
data = json.load(f)
return data['project_id']
def download_gitlab_project_by_version():
try:
# Load the configuration from config.json
# Extract GitLab project information from the config
api_url = config2['gitlab']['api_url']
project_id = urllib.parse.quote(config2['gitlab']['project']['id'], safe="")
version = config2['gitlab']['project']['version']
# Construct the URL for the release's zip file
url = f"{api_url}/projects/{project_id}/repository/archive.zip?sha={version}"
# Send GET request to download the zip file
response = requests.get(url, stream=True)
archive_bytes = io.BytesIO(response.content)
if response.status_code == 200:
# Extract filename from content-disposition header
content_disposition = response.headers.get("content-disposition")
if content_disposition and "filename=" in content_disposition:
filename = content_disposition.split("filename=")[-1].strip('"')
# test
# target_path = f"{DATA_DIR}/{filename}"
# Check if the request was successful
if response.status_code == 200:
api.upload_file(
path_or_fileobj= archive_bytes,
path_in_repo= f"{DATA_DIR}/{filename}",
repo_id=HF_SPACE_NAME,
repo_type='space'
)
print(f"Release {version} downloaded successfully as {file_path}.")
else:
print(f"Failed to download the release: {response.status_code} - {response.reason}")
print(response.text)
except FileNotFoundError:
print("The config.json file was not found. Please ensure it exists in the project directory.")
except json.JSONDecodeError:
print("Failed to parse the config.json file. Please ensure it contains valid JSON.")
except Exception as e:
print(f"An error occurred: {e}")
def download_gitlab_repo():
print("Start the upload_gitRepository function")
project_id = load_project_id('repository_ids.json')
encoded_project_id = urllib.parse.quote_plus(project_id)
# Define the URL to download the repository archive
archive_url = f"{GITLAB_API_URL}/projects/{encoded_project_id}/repository/archive.zip"
# Download the repository archive
response = requests.get(archive_url)
archive_bytes = io.BytesIO(response.content)
# Retrieve the original file name from the response headers
content_disposition = response.headers.get('content-disposition')
if content_disposition:
filename = content_disposition.split('filename=')[-1].strip('\"')
else:
filename = 'archive.zip' # Fallback to a default name if not found
# Check if the file already exists in the repository
existing_files = api.list_repo_files(repo_id=HF_SPACE_NAME, repo_type='space')
target_path = f"{DATA_DIR}/{filename}"
print(f"Target Path: '{target_path}'")
print(f"Existing Files: {[repr(file) for file in existing_files]}")
if target_path in existing_files:
print(f"File '{target_path}' already exists in the repository. Skipping upload...")
else:
# Upload the ZIP file to the new folder in the Hugging Face space repository
print("Uploading File to directory:")
print(f"Archive Bytes: {repr(archive_bytes.getvalue())[:100]}") # Show a preview of bytes
print(f"Target Path in Repo: '{target_path}'")
api.upload_file(
path_or_fileobj=archive_bytes,
path_in_repo=target_path,
repo_id=HF_SPACE_NAME,
repo_type='space'
)
print("Upload complete")
def get_all_files_in_folder(temp_dir, folder_path):
all_files = []
target_dir = os.path.join(temp_dir, folder_path)
for root, dirs, files in os.walk(target_dir):
print(f"Files in current directory ({root}): {files}")
for file in files:
print(f"Processing file: {file}")
all_files.append(os.path.join(root, file))
return all_files
def get_file(temp_dir, file_path):
full_path = os.path.join(temp_dir, file_path)
return full_path
def process_directory(directory, folder_paths, file_paths):
all_texts = []
file_references = []
zip_filename = next((file for file in os.listdir(directory) if file.endswith('.zip')), None)
zip_file_path = os.path.join(directory, zip_filename)
with tempfile.TemporaryDirectory() as tmpdirname:
# Unzip the file into the temporary directory
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall(tmpdirname)
files = []
print("tmpdirname: " , tmpdirname)
unzipped_root = os.listdir(tmpdirname)
print("unzipped_root ", unzipped_root)
tmpsubdirpath= os.path.join(tmpdirname, unzipped_root[0])
print("tempsubdirpath: ", tmpsubdirpath)
if folder_paths:
for folder_path in folder_paths:
files += get_all_files_in_folder(tmpsubdirpath, folder_path)
if file_paths:
files += [get_file(tmpsubdirpath, file_path) for file_path in file_paths]
print(f"Total number of files: {len(files)}")
for file_path in files:
# print("111111111:", file_path)
file_ext = os.path.splitext(file_path)[1]
# print("222222222:", file_ext)
if os.path.getsize(file_path) == 0:
print(f"Skipping an empty file: {file_path}")
continue
with open(file_path, 'rb') as f:
if file_ext in ['.rst', '.py']:
text = f.read().decode('utf-8')
all_texts.append(text)
print("Filepaths brother:", file_path)
relative_path = os.path.relpath(file_path, tmpsubdirpath)
print("Relative Filepaths brother:", relative_path)
file_references.append(relative_path)
return all_texts, file_references
def split_python_code_into_chunks(texts, file_paths):
chunks = []
for text, file_path in zip(texts, file_paths):
document_chunks = chunk_python_code_with_metadata(text, file_path)
chunks.extend(document_chunks)
return chunks
# Split text into chunks
def split_into_chunks(texts, references, chunk_size, chunk_overlap):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
chunks = []
for text, reference in zip(texts, references):
chunks.extend([
Document(
page_content=chunk,
metadata={
"source": reference,
"usage": "doc"
}
)
for chunk in text_splitter.split_text(text)
])
return chunks
# Setup Vectorstore
def embed_documents_into_vectorstore(chunks, model_name, persist_directory):
print("Start setup_vectorstore_function")
embedding_model = HuggingFaceEmbeddings(model_name=model_name)
vectorstore = get_chroma_vectorstore(embedding_model, persist_directory)
vectorstore.add_documents(chunks)
return vectorstore
# Setup LLM
def setup_llm(model_name, temperature, api_key):
llm = ChatGroq(model=model_name, temperature=temperature, api_key=api_key)
return llm
def format_kadi_apy_library_context(docs):
doc_context = []
for doc in docs:
# Extract metadata information
class_info = doc.metadata.get("class", "Unknown Class")
type_info = doc.metadata.get("type", "Unknown Type")
source_info = doc.metadata.get("source", "Unknown Type")
print(":}\n\n", doc.page_content)
formatted_doc = f"# source: {source_info}\n# class: {class_info}\n# type: {type_info}\n{doc.page_content}\n\n\n"
doc_context.append(formatted_doc)
return doc_context
def format_kadi_api_doc_context(docs):
doc_context = []
for doc in docs:
source_info = doc.metadata.get("source", "Unknown Type")
print(":}\n\n", doc.page_content)
formatted_doc = f"# source: {source_info}\n{doc.page_content}\n\n\n"
doc_context.append(formatted_doc)
return doc_context
def rag_workflow(query):
"""
RAGChain class to perform the complete RAG workflow.
"""
# Assume 'llm' and 'vector_store' are already initialized instances
rag_chain = RAGChain(llm, vector_store)
# Step 1: Predict which library usage is relevant
library_usage_prediction = rag_chain.predict_library_usage(query)
print(f"Predicted library usage: {library_usage_prediction}")
# Step 2: Retrieve contexts (documents and code snippets)
doc_contexts, code_contexts = rag_chain.retrieve_contexts(query, library_usage_prediction)
print("Retrieved Document Contexts:", doc_contexts)
print("Retrieved Code Contexts:", code_contexts)
# Step 3: Format the contexts
formatted_doc_context, formatted_code_context = rag_chain.format_context(doc_contexts, code_contexts)
print("Formatted Document Contexts:", formatted_doc_context)
print("Formatted Code Contexts:", formatted_code_context)
# Step 4: Generate the final response
response = rag_chain.generate_response(query, formatted_doc_context, formatted_code_context)
print("Generated Response:", response)
return response
def get_chroma_vectorstore2(embedding_model):
# Define the persist_directory path
vectorstore_path = "/home/user/data"
# Ensure the directory exists
os.makedirs(vectorstore_path, exist_ok=True) # Creates it if it doesn't exist
print(f"Using persist_directory: {vectorstore_path}")
# Initialize the Chroma vectorstore with the specified persist_directory
vectorstore = Chroma(persist_directory=vectorstore_path, embedding_function=embedding_model)
return vectorstore
def initialize():
global vector_store, chunks, llm
download_gitlab_project_by_version()
code_folder_paths = ['kadi_apy']
doc_folder_paths = ['docs/source/']
code_texts, code_references = process_directory(DATA_DIR, code_folder_paths, [])
print("LEEEEEEEEEEEENGTH of code_texts: ", len(code_texts))
doc_texts, kadiAPY_doc_references = process_directory(DATA_DIR, doc_folder_paths, [])
print("LEEEEEEEEEEEENGTH of doc_files: ", len(doc_texts))
code_chunks = split_python_code_into_chunks(code_texts, code_references)
doc_chunks = split_into_chunks(doc_texts, kadiAPY_doc_references, CHUNK_SIZE, CHUNK_OVERLAP)
print(f"Total number of code_chunks: {len(code_chunks)}")
print(f"Total number of doc_chunks: {len(doc_chunks)}")
filename = "test"
vector_store = embed_documents_into_vectorstore(doc_chunks + code_chunks, EMBEDDING_MODEL_NAME, f"{DATA_DIR}/{filename}")
llm = setup_llm(LLM_MODEL_NAME, LLM_TEMPERATURE, GROQ_API_KEY)
from langchain_community.document_loaders import TextLoader
initialize()
# Gradio utils
def check_input_text(text):
if not text:
gr.Warning("Please input a question.")
raise TypeError
return True
def add_text(history, text):
history = history + [(text, None)]
yield history, ""
import gradio as gr
def bot_kadi(history):
user_query = history[-1][0]
response = rag_workflow(user_query)
history[-1] = (user_query, response)
yield history
def main():
with gr.Blocks() as demo:
gr.Markdown("## KadiAPY - AI Coding-Assistant")
gr.Markdown("AI assistant for KadiAPY based on RAG architecture powered by LLM")
with gr.Tab("KadiAPY - AI Assistant"):
with gr.Row():
with gr.Column(scale=10):
chatbot = gr.Chatbot([], elem_id="chatbot", label="Kadi Bot", bubble_full_width=False, show_copy_button=True, height=600)
user_txt = gr.Textbox(label="Question", placeholder="Type in your question and press Enter or click Submit")
with gr.Row():
with gr.Column(scale=1):
submit_btn = gr.Button("Submit", variant="primary")
with gr.Column(scale=1):
clear_btn = gr.Button("Clear", variant="stop")
gr.Examples(
examples=[
"Who is working on Kadi4Mat?",
"How do i install the Kadi-Apy library?",
"How do i install the Kadi-Apy library for development?",
"I need a method to upload a file to a record",
],
inputs=user_txt,
outputs=chatbot,
fn=add_text,
label="Try asking...",
cache_examples=False,
examples_per_page=3,
)
user_txt.submit(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot])
submit_btn.click(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot])
clear_btn.click(lambda: None, None, chatbot, queue=False)
demo.launch()
if __name__ == "__main__":
main()