import os import gradio as gr import logging from git import Repo from dotenv import load_dotenv from sentence_transformers import SentenceTransformer from langchain_pinecone import PineconeVectorStore from langchain.schema import Document from tree_sitter_languages import get_parser from pinecone import Pinecone import openai import numpy as np # Load environment variables load_dotenv() # Logging Configuration logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Environment Variables CLONE_DIR = "./cloned_repos" PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") PINECONE_INDEX_KEY = "codebase-app" GROQ_API_KEY = os.getenv("GROQ_API_KEY") # Initialize GROQ API client = openai.OpenAI( base_url="https://api.groq.com/openai/v1", api_key=GROQ_API_KEY ) # Initialize Pinecone pinecone_client = Pinecone(api_key=PINECONE_API_KEY) pinecone_index = pinecone_client.Index(PINECONE_INDEX_KEY) # Initialize SentenceTransformer Embedding Model embedding_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") # Supported Extensions SUPPORTED_EXTENSIONS = {".py", ".java", ".js", ".ts", ".cpp", ".h", ".ipynb"} IGNORED_DIRS = {"node_modules", "venv", "env", ".git", "__pycache__"} #Systems prompt system_prompt =f"""You are a Senior Software engineer with more than 20 years of experience delivering software for massive use. You are very technical and have complete expertise over all domains of software in all aspects. Answer any questions I have about the codebase, based on the code provided. Always consider all of the context provided when forming a response. """ # Backend Logic: Clone Repository def clone_repository(repo_url: str) -> str: """Clone the GitHub repository locally.""" repo_name = repo_url.split("/")[-1].replace(".git", "") repo_path = os.path.join(CLONE_DIR, repo_name) if not os.path.exists(CLONE_DIR): os.makedirs(CLONE_DIR) if os.path.exists(repo_path): logger.info(f"Repository already exists: {repo_path}") return repo_path Repo.clone_from(repo_url, repo_path) logger.info(f"Cloned repository to: {repo_path}") return repo_path # Backend Logic: Parse Repository class SimpleTreeSitterParser: """Parser for extracting code chunks from files.""" def __init__(self, language: str): self.language = language try: self.parser = get_parser(language) # Ensure only the required argument is passed except Exception as e: logger.error(f"Error initializing parser for {language}: {e}") raise ValueError(f"Parser error for {language}: {e}") def parse(self, code: str) -> list: try: tree = self.parser.parse(bytes(code, "utf-8")) root = tree.root_node chunks = [] for child in root.children: chunks.append({ "type": child.type, "content": code[child.start_byte:child.end_byte], "start_line": child.start_point[0] + 1, "end_line": child.end_point[0] + 1, }) return chunks except Exception as e: logger.error(f"Error parsing code: {e}") return [] def parse_repository(repo_path: str) -> list: """Parse repository files into meaningful chunks.""" chunks = [] for root, _, files in os.walk(repo_path): if any(ignored_dir in root for ignored_dir in IGNORED_DIRS): continue for file in files: ext = os.path.splitext(file)[1] if ext not in SUPPORTED_EXTENSIONS: logger.warning(f"Skipping unsupported file: {file}") continue file_path = os.path.join(root, file) language = { ".py": "python", ".ts": "typescript", ".js": "javascript", ".java": "java", ".cpp": "cpp", }.get(ext, "unknown") try: logger.info(f"Processing file: {file_path}") code = get_file_content(file_path) if not code: logger.warning(f"No content found in {file_path}") continue parser = SimpleTreeSitterParser(language) parsed_chunks = parser.parse(code) chunks.extend(parsed_chunks) except ValueError as ve: logger.error(f"Skipping file {file_path} due to parser error: {ve}") except Exception as e: logger.error(f"Unexpected error processing {file_path}: {e}") return chunks # Helper: Read File Content def get_file_content(file_path: str) -> str: """Read and return the content of a file.""" try: with open(file_path, "r", encoding="utf-8") as f: return f.read() except Exception as e: logger.error(f"Error reading file {file_path}: {e}") return "" # Backend Logic: Store Embeddings def store_embeddings(documents, namespace="default"): """Store embeddings in Pinecone.""" try: texts = [doc.page_content for doc in documents] embeddings = embedding_model.encode(texts, show_progress_bar=True) vectors = [ { "id": str(i), "values": embeddings[i].tolist(), "metadata": {"text": doc.page_content, **doc.metadata}, } for i, doc in enumerate(documents) ] pinecone_index.upsert(vectors=vectors, namespace=namespace) logger.info(f"Stored {len(vectors)} embeddings in Pinecone namespace '{namespace}'.") except Exception as e: logger.error(f"Error storing embeddings: {e}") raise # Backend Logic: Perform RAG def perform_rag(query: str, namespace="default") -> str: """Retrieve context and generate responses.""" try: query_embedding = embedding_model.encode(query).tolist() response = pinecone_index.query( vector=query_embedding, top_k=10, include_metadata=True, namespace=namespace ) if not response.get('matches'): return "No relevant context found." contexts = [match['metadata'].get('text', '') for match in response['matches']] augmented_query = "\n" + "\n\n-------\n\n".join(contexts) + "\n-------\n\n\n" + query llm_response = client.chat.completions.create( model="llama-3.1-8b-instant", messages=[ {"role": "system", "content":system_prompt}, {"role": "user", "content": augmented_query} ] ) return llm_response.choices[0].message.content except Exception as e: logger.error(f"Error performing RAG: {e}") return f"Error: {e}" # Process Repository def process_repo(repo_url: str) -> str: """Clone, parse, and store embeddings for a repository.""" try: namespace = repo_url.split("/")[-1].replace(".git", "") repo_path = clone_repository(repo_url) chunks = parse_repository(repo_path) if not chunks: return "No valid chunks found in the repository." documents = [Document(page_content=chunk["content"], metadata={"repo_url": repo_url}) for chunk in chunks] store_embeddings(documents, namespace=namespace) return f"Repository processed successfully in namespace '{namespace}'!" except Exception as e: logger.error(f"Error processing repository: {e}") return f"Error: {e}" # Fetch Namespaces def fetch_namespaces(): """Retrieve namespaces from Pinecone.""" try: stats = pinecone_index.describe_index_stats() return list(stats.get("namespaces", {}).keys()) except Exception as e: logger.error(f"Error fetching namespaces: {e}") return [] # Gradio UI def create_ui(): namespaces = fetch_namespaces() with gr.Blocks() as demo: namespace_state = gr.State(value=None) chat_history = gr.State(value=[]) with gr.Column(): gr.Markdown("## Codebase Chat App with Repository Management") gr.Markdown(""" **Instructions:** 1. Enter the GitHub repository URL you wish to clone and click **Git Clone 😺**. 2. After cloning, to see the new repository appear in the namespace dropdown, type any character into the URL box and click **Git Clone 😺** again. 3. Select the desired namespace from the dropdown. 4. Use the chatbot below to interact with the selected codebase. (Sorry for this I'm currently trying to solve this bug, feel free to se the code if you can spot the issue 🙂‍↕️) """) with gr.Row(): repo_url_input = gr.Textbox(label="GitHub Repository URL", placeholder="Enter repo URL to clone") clone_button = gr.Button("Git Clone 😺") clone_status = gr.Textbox(label="Clone Status", interactive=False) namespace_dropdown = gr.Dropdown(choices=namespaces, label="Namespace", interactive=True) chatbot = gr.Chatbot(label="Codebase Chatbot", type="messages") message_input = gr.Textbox(placeholder="Enter your message here...") send_button = gr.Button("Send") def update_namespace_or_clone(repo_url, current_namespace): """Clone repository and update namespaces.""" if repo_url: message = process_repo(repo_url) updated_namespaces = fetch_namespaces() return ( gr.update(choices=updated_namespaces, value=None), message, [], # Clear chat history None ) return gr.update(), "Please provide a repository URL.", current_namespace, current_namespace def handle_query(message, history, namespace): """Handle chatbot queries.""" if not namespace: new_history = history + [{"role": "assistant", "content": "Please select a namespace first!"}] return new_history, new_history, gr.update(value="") response = perform_rag(message, namespace) # Convert history to the correct format formatted_history = history + [ {"role": "user", "content": message}, {"role": "assistant", "content": response} ] return formatted_history, formatted_history, gr.update(value="") # Bind clone button clone_button.click( update_namespace_or_clone, inputs=[repo_url_input, namespace_state], outputs=[namespace_dropdown, clone_status, chat_history, namespace_state], ) # Bind query button send_button.click( handle_query, inputs=[message_input, chat_history, namespace_dropdown], outputs=[chatbot, chat_history, message_input], ) return demo if __name__ == "__main__": app = create_ui() app.launch()