|
import streamlit as st |
|
import sqlite3 |
|
import hashlib |
|
import os |
|
import google.generativeai as genai |
|
import zipfile |
|
from git import Repo |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
import torch |
|
import requests |
|
|
|
|
|
|
|
DB_FILE = "users.db" |
|
|
|
def create_user_table(): |
|
conn = sqlite3.connect(DB_FILE) |
|
cursor = conn.cursor() |
|
cursor.execute(""" |
|
CREATE TABLE IF NOT EXISTS users ( |
|
username TEXT PRIMARY KEY, |
|
password TEXT |
|
) |
|
""") |
|
conn.commit() |
|
conn.close() |
|
|
|
def add_user(username, password): |
|
conn = sqlite3.connect(DB_FILE) |
|
cursor = conn.cursor() |
|
hashed_password = hashlib.sha256(password.encode()).hexdigest() |
|
try: |
|
cursor.execute("INSERT INTO users (username, password) VALUES (?, ?)", (username, hashed_password)) |
|
conn.commit() |
|
except sqlite3.IntegrityError: |
|
st.error("Username already exists. Please choose a different username.") |
|
conn.close() |
|
|
|
def authenticate_user(username, password): |
|
conn = sqlite3.connect(DB_FILE) |
|
cursor = conn.cursor() |
|
hashed_password = hashlib.sha256(password.encode()).hexdigest() |
|
cursor.execute("SELECT * FROM users WHERE username = ? AND password = ?", (username, hashed_password)) |
|
user = cursor.fetchone() |
|
conn.close() |
|
return user |
|
|
|
def initialize_session_state(): |
|
if "authenticated" not in st.session_state: |
|
st.session_state.authenticated = False |
|
if "username" not in st.session_state: |
|
st.session_state.username = None |
|
if "page" not in st.session_state: |
|
st.session_state.page = "login" |
|
if "current_project" not in st.session_state: |
|
st.session_state.current_project = None |
|
if "project_uploaded" not in st.session_state: |
|
st.session_state.project_uploaded = False |
|
|
|
def main(): |
|
st.title("SimplifAI") |
|
|
|
|
|
initialize_session_state() |
|
|
|
|
|
create_user_table() |
|
|
|
|
|
if st.session_state.page == "login": |
|
login_page() |
|
elif st.session_state.page == "workspace": |
|
workspace_page() |
|
elif st.session_state.page == "project_view": |
|
project_view_page() |
|
elif st.session_state.page == "generate_documentation": |
|
generate_documentation_page() |
|
elif st.session_state.page == "view_documentation": |
|
view_documentation_page() |
|
|
|
def login_page(): |
|
st.subheader("Please Log In or Register to Continue") |
|
auth_mode = st.radio("Choose an Option", ["Log In", "Register"], horizontal=True) |
|
|
|
if auth_mode == "Log In": |
|
st.subheader("Log In") |
|
username = st.text_input("Username", key="login_username") |
|
password = st.text_input("Password", type="password", key="login_password") |
|
|
|
|
|
if st.button("Log In"): |
|
if authenticate_user(username, password): |
|
st.session_state.authenticated = True |
|
st.session_state.username = username |
|
st.session_state.page = "workspace" |
|
else: |
|
st.error("Invalid username or password. Please try again.") |
|
|
|
elif auth_mode == "Register": |
|
st.subheader("Register") |
|
username = st.text_input("Create Username", key="register_username") |
|
password = st.text_input("Create Password", type="password", key="register_password") |
|
|
|
|
|
if st.button("Register"): |
|
if username and password: |
|
add_user(username, password) |
|
st.success("Account created successfully! You can now log in.") |
|
else: |
|
st.error("Please fill in all fields.") |
|
|
|
|
|
def workspace_page(): |
|
|
|
st.sidebar.title(f"Hello, {st.session_state.username}!") |
|
if st.sidebar.button("Log Out"): |
|
st.session_state.authenticated = False |
|
st.session_state.username = None |
|
st.session_state.page = "login" |
|
|
|
|
|
user_folder = os.path.join("user_projects", st.session_state.username) |
|
os.makedirs(user_folder, exist_ok=True) |
|
|
|
|
|
projects = [d for d in os.listdir(user_folder) if os.path.isdir(os.path.join(user_folder, d))] |
|
|
|
|
|
selected_project = st.sidebar.selectbox("Projects", ["Select a project"] + projects) |
|
|
|
if selected_project != "Select a project": |
|
st.session_state.current_project = selected_project |
|
st.session_state.page = "project_view" |
|
st.rerun() |
|
|
|
|
|
if st.session_state.project_uploaded: |
|
st.success(f"Project '{st.session_state.current_project}' uploaded successfully!") |
|
st.session_state.project_uploaded = False |
|
|
|
|
|
st.subheader("Workspace") |
|
st.write("You can create a new project by uploading files or folders, or by cloning a GitHub repository.") |
|
|
|
|
|
|
|
|
|
action = st.radio("Choose an action", ["Upload Files or Folders", "Clone GitHub Repository"], horizontal=True) |
|
|
|
project_name = st.text_input("Enter a project name") |
|
|
|
if action == "Upload Files or Folders": |
|
st.subheader("Upload Files or Folders") |
|
uploaded_files = st.file_uploader( |
|
"Upload one or more files or a .zip archive for folders", accept_multiple_files=True |
|
) |
|
|
|
if uploaded_files and project_name: |
|
if st.button("Upload Project"): |
|
project_folder = os.path.join(user_folder, project_name) |
|
os.makedirs(project_folder, exist_ok=True) |
|
|
|
for uploaded_file in uploaded_files: |
|
|
|
file_path = os.path.join(project_folder, uploaded_file.name) |
|
|
|
with open(file_path, "wb") as f: |
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
if uploaded_file.name.endswith(".zip"): |
|
try: |
|
with zipfile.ZipFile(file_path, "r") as zip_ref: |
|
zip_ref.extractall(project_folder) |
|
os.remove(file_path) |
|
st.success(f"Folder from {uploaded_file.name} extracted successfully!") |
|
except zipfile.BadZipFile: |
|
st.error(f"File {uploaded_file.name} is not a valid .zip file.") |
|
else: |
|
st.success(f"File {uploaded_file.name} saved successfully!") |
|
|
|
|
|
st.session_state.current_project = project_name |
|
st.session_state.project_uploaded = True |
|
st.rerun() |
|
|
|
elif action == "Clone GitHub Repository": |
|
st.subheader("Clone GitHub Repository") |
|
repo_url = st.text_input("Enter the GitHub repository URL") |
|
|
|
if repo_url and project_name: |
|
if st.button("Upload Project"): |
|
project_folder = os.path.join(user_folder, project_name) |
|
os.makedirs(project_folder, exist_ok=True) |
|
|
|
try: |
|
Repo.clone_from(repo_url, project_folder) |
|
|
|
st.session_state.current_project = project_name |
|
st.session_state.project_uploaded = True |
|
st.rerun() |
|
except Exception as e: |
|
st.error(f"Failed to clone repository: {e}") |
|
|
|
|
|
|
|
|
|
|
|
gemini = os.getenv("GEMINI") |
|
genai.configure(api_key=gemini) |
|
model = genai.GenerativeModel("gemini-1.5-flash") |
|
|
|
|
|
|
|
def read_project_files(project_path): |
|
"""Reads all files in the project directory and its subdirectories.""" |
|
file_paths = [] |
|
for root, _, files in os.walk(project_path): |
|
for file in files: |
|
|
|
if ".git" not in root: |
|
file_paths.append(os.path.join(root, file)) |
|
return file_paths |
|
|
|
def read_files(file_paths): |
|
"""Reads content from a list of file paths.""" |
|
file_contents = {} |
|
for file_path in file_paths: |
|
if os.path.exists(file_path): |
|
try: |
|
|
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
file_contents[file_path] = file.read() |
|
except UnicodeDecodeError: |
|
print(f"Skipping binary or non-UTF-8 file: {file_path}") |
|
else: |
|
print(f"File not found: {file_path}") |
|
return file_contents |
|
|
|
|
|
def generate_prompt(file_contents, functionality_description): |
|
"""Generates a prompt for Gemini to analyze the files.""" |
|
prompt = "Analyze the following code files to identify all functions required to implement the functionality: " |
|
prompt += f"'{functionality_description}'.\n\n" |
|
|
|
for file_path, content in file_contents.items(): |
|
prompt += f"File: {os.path.basename(file_path)}\n{content}\n\n" |
|
|
|
prompt += "For each relevant function, provide:\n" |
|
prompt += "1. Which file the function is found in.\n" |
|
prompt += "2. The function name.\n" |
|
prompt += "3. Dependencies on other functions or modules.\n" |
|
|
|
prompt += """ |
|
Return your output in the following format providing no commentary: |
|
|
|
Project Summary: |
|
<A summary of the project> |
|
Functionality: |
|
<a more precise version of the user defined funcionality> |
|
Functions: |
|
<for each file containing relevant functions> |
|
<file name of the file that contains relevant functions>: |
|
-<relevant function header from the current file>: |
|
-Function Dependencies:<function dependencies seperated by commas> |
|
""" |
|
|
|
return prompt |
|
|
|
def identify_required_functions(project_path, functionality_description): |
|
"""Identifies required functions for a specified functionality.""" |
|
|
|
file_paths = read_project_files(project_path) |
|
|
|
|
|
file_contents = read_files(file_paths) |
|
|
|
|
|
prompt = generate_prompt(file_contents, functionality_description) |
|
|
|
|
|
response = model.generate_content(prompt) |
|
|
|
|
|
return response.text |
|
|
|
|
|
def extract_cleaned_gemini_output(gemini_output): |
|
""" |
|
Extracts and formats the cleaned output from Gemini to send to Qwen or display to the user. |
|
Args: |
|
gemini_output (str): The raw output returned by Gemini. |
|
Returns: |
|
str: Cleaned and formatted output. |
|
""" |
|
lines = gemini_output.splitlines() |
|
cleaned_output = [] |
|
capture = False |
|
|
|
for line in lines: |
|
line = line.strip() |
|
|
|
|
|
if line.startswith("Project Summary:"): |
|
capture = True |
|
cleaned_output.append(line) |
|
elif line.startswith("Functionality:") or line.startswith("Functionality Flow:"): |
|
cleaned_output.append(line) |
|
elif line.startswith("Function Documentation:"): |
|
cleaned_output.append(line) |
|
elif line.startswith("Functions:"): |
|
capture = True |
|
cleaned_output.append(line) |
|
elif capture: |
|
|
|
if line.startswith("-public") or line.startswith("-private"): |
|
cleaned_output.append(line) |
|
elif not line: |
|
capture = False |
|
|
|
return "\n".join(cleaned_output) |
|
|
|
|
|
|
|
|
|
def split_into_chunks(content, chunk_size=1000): |
|
"""Splits large content into smaller chunks.""" |
|
return [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)] |
|
|
|
|
|
API_URL = "https://api-inference.huggingface.co/models/Qwen/Qwen2.5-Coder-32B-Instruct" |
|
qwen = os.getenv("QWEN") |
|
headers = {"Authorization": f"Bearer {qwen}"} |
|
|
|
def extract_cleaned_gemini_output(gemini_output): |
|
""" |
|
Extracts and formats the cleaned output from Gemini to send to Qwen. |
|
Args: |
|
gemini_output (str): The output returned by Gemini. |
|
Returns: |
|
str: Cleaned and formatted output for Qwen. |
|
""" |
|
lines = gemini_output.splitlines() |
|
cleaned_output = [] |
|
functions_section = False |
|
|
|
for line in lines: |
|
line = line.strip() |
|
if line.startswith("Project Summary:") or line.startswith("Functionality:"): |
|
cleaned_output.append(line) |
|
elif line.startswith("Functions:"): |
|
cleaned_output.append(line) |
|
functions_section = True |
|
elif functions_section and line: |
|
cleaned_output.append(line) |
|
elif line.startswith("File:") or "Qwen," in line: |
|
break |
|
|
|
return "\n".join(cleaned_output) |
|
|
|
|
|
|
|
def clean_output(output): |
|
""" |
|
Cleans the output from Qwen to ensure only required sections are displayed. |
|
""" |
|
lines = output.splitlines() |
|
filtered_lines = [] |
|
in_valid_section = False |
|
|
|
for line in lines: |
|
line = line.strip() |
|
if line.startswith("Project Summary:") or line.startswith("Functionality Summary:") or line.startswith("Functionality Flow:"): |
|
in_valid_section = True |
|
filtered_lines.append(line) |
|
elif line.startswith("Function Documentation:"): |
|
in_valid_section = True |
|
filtered_lines.append(line) |
|
elif in_valid_section and line: |
|
filtered_lines.append(line) |
|
elif line.startswith("File:") or line.startswith("User-specified functionality:"): |
|
in_valid_section = False |
|
|
|
return "\n".join(filtered_lines) |
|
|
|
|
|
|
|
|
|
def validate_and_generate_documentation(api_url, headers, gemini_output, functionality_description): |
|
""" |
|
Uses the Hugging Face Inference API to generate clean and relevant documentation using Qwen. |
|
""" |
|
|
|
cleaned_gemini_output = extract_cleaned_gemini_output(gemini_output) |
|
|
|
|
|
prompt = f""" |
|
User-specified functionality: '{functionality_description}' |
|
Functions identified by Gemini: |
|
{cleaned_gemini_output} |
|
|
|
Tasks: |
|
1. Generate a project summary: |
|
' |
|
Project Summary: |
|
<Include project description and library or module dependencies> |
|
' |
|
2. Refine the user-defined functionality: |
|
' |
|
Functionality Summary: |
|
<Provide an enhanced description of user-specified functionality> |
|
' |
|
3. Describe the functionality flow: |
|
' |
|
Functionality Flow: |
|
<Explain the sequence of functions and data flow> |
|
' |
|
4. Generate detailed documentation for each function: |
|
' |
|
Function Documentation: |
|
For each relevant function: |
|
- Summary: <Description of the function's purpose> |
|
- Inputs: <Details of inputs and their types> |
|
- Outputs: <Details of outputs and their types> |
|
- Dependencies: <Dependencies on other modules/functions> |
|
- Data structures: <Details of data structures used> |
|
- Algorithmic Details: <Description of the algorithm used> |
|
- Error Handling: <Description of how the function handles errors> |
|
- Assumptions: <Any assumptions the function makes> |
|
- Example Usage: <Example demonstrating usage> |
|
' |
|
5. Return only the required information for the above tasks, and exclude everything else. |
|
""" |
|
|
|
|
|
payload = {"inputs": prompt, "parameters": {"max_new_tokens": 1024}} |
|
response = requests.post(api_url, headers=headers, json=payload) |
|
|
|
|
|
if response.status_code == 200: |
|
api_response = response.json() |
|
output = api_response.get("generated_text", "") if isinstance(api_response, dict) else api_response[0].get("generated_text", "") |
|
return clean_output(output) |
|
else: |
|
raise ValueError(f"Error during API call: {response.status_code}, {response.text}") |
|
|
|
|
|
def generate_documentation_page(): |
|
st.subheader(f"Generate Documentation for {st.session_state.current_project}") |
|
st.write("Enter the functionality or parts of the project for which you'd like to generate documentation.") |
|
|
|
|
|
functionality = st.text_area( |
|
"Describe the functionality", |
|
placeholder="e.g., Explain the function of the file `main.py`", |
|
) |
|
|
|
|
|
if st.button("Analyze"): |
|
if functionality.strip(): |
|
st.write("Analyzing project files... Please wait.") |
|
|
|
|
|
user_folder = os.path.join("user_projects", st.session_state.username) |
|
project_folder = os.path.join(user_folder, st.session_state.current_project) |
|
|
|
if os.path.exists(project_folder): |
|
try: |
|
|
|
gemini_result = identify_required_functions(project_folder, functionality) |
|
|
|
|
|
final_documentation = validate_and_generate_documentation( |
|
API_URL, headers, gemini_result, functionality |
|
) |
|
|
|
|
|
st.success("Documentation generated successfully!") |
|
st.text_area("Generated Documentation", final_documentation, height=600) |
|
except Exception as e: |
|
st.error(f"An error occurred: {e}") |
|
else: |
|
st.error("Project folder not found. Ensure the GitHub repository was cloned successfully.") |
|
else: |
|
st.error("Please enter the functionality to analyze.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def view_documentation_page(): |
|
st.subheader(f"View Documentation for {st.session_state.current_project}") |
|
st.write("This page will display the generated documentation for the selected project.") |
|
if st.button("Back to Project"): |
|
st.session_state.page = "project_view" |
|
st.rerun() |
|
|
|
def project_view_page(): |
|
|
|
st.sidebar.title(f"Project: {st.session_state.current_project}") |
|
if st.sidebar.button("Back to Workspace"): |
|
st.session_state.page = "workspace" |
|
st.rerun() |
|
if st.sidebar.button("Log Out"): |
|
st.session_state.authenticated = False |
|
st.session_state.username = None |
|
st.session_state.page = "login" |
|
st.rerun() |
|
|
|
|
|
st.subheader(f"Project: {st.session_state.current_project}") |
|
st.write("Manage your project and explore its files.") |
|
|
|
|
|
if st.button("Generate Documentation"): |
|
st.session_state.page = "generate_documentation" |
|
st.rerun() |
|
|
|
if st.button("View Documentation"): |
|
st.session_state.page = "view_documentation" |
|
st.rerun() |
|
|
|
|
|
if "show_file_structure" not in st.session_state: |
|
st.session_state.show_file_structure = False |
|
|
|
if st.button("Show File Structure"): |
|
st.session_state.show_file_structure = not st.session_state.show_file_structure |
|
|
|
if st.session_state.show_file_structure: |
|
user_folder = os.path.join("user_projects", st.session_state.username) |
|
project_folder = os.path.join(user_folder, st.session_state.current_project) |
|
|
|
st.write("File structure:") |
|
|
|
for root, dirs, files in os.walk(project_folder): |
|
level = root.replace(project_folder, "").count(os.sep) |
|
indent = " " * 4 * level |
|
|
|
if level == 0: |
|
st.write(f"π {os.path.basename(root)}") |
|
else: |
|
with st.expander(f"{indent}π {os.path.basename(root)}"): |
|
sub_indent = " " * 4 * (level + 1) |
|
for file in files: |
|
st.write(f"{sub_indent}π {file}") |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|
|
|