parse_py / app.py
broadfield-dev's picture
Update app.py
12bbd2a verified
raw
history blame
15.6 kB
# app.py
from flask import Flask, request, render_template, jsonify, send_file
from parser import parse_python_code
import os
import json
import io
import chromadb
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from datasets import Dataset, load_dataset
from transformers import AutoTokenizer, AutoModel
import torch
import subprocess # To call process_hf_dataset.py
# User-configurable variables
DB_NAME = "python_programs" # ChromaDB collection name
HF_DATASET_NAME = "python_program_vectors" # Hugging Face Dataset name
HF_KEY = "YOUR_HUGGINGFACE_KEY" # Replace with your Hugging Face API key
UPLOAD_DIR = "./uploads" # Directory for uploads
PERSIST_DIR = "./chroma_data" # Directory for persistent ChromaDB storage
USE_GPU = False # Default to CPU, set to True for GPU if available
app = Flask(__name__)
def reconstruct_code(parts):
"""Reconstruct the original code from parsed parts."""
sorted_parts = sorted(parts, key=lambda p: p['location'][0])
return ''.join(part['source'] for part in sorted_parts)
def init_chromadb(persist_dir=PERSIST_DIR):
"""Initialize ChromaDB client, optionally with persistent storage."""
try:
# Use persistent storage if directory exists, otherwise in-memory
if os.path.exists(persist_dir):
client = chromadb.PersistentClient(path=persist_dir)
else:
client = chromadb.Client()
return client
except Exception as e:
print(f"Error initializing ChromaDB: {e}")
return chromadb.Client() # Fallback to in-memory
def create_collection(client, collection_name=DB_NAME):
"""Create or get a ChromaDB collection for Python programs."""
try:
collection = client.get_collection(name=collection_name)
except:
collection = client.create_collection(name=collection_name)
return collection
def store_program(client, code, sequence, vectors, collection_name=DB_NAME):
"""Store a program in ChromaDB with its code, sequence, and vectors."""
collection = create_collection(client, collection_name)
# Flatten vectors to ensure they are a list of numbers (ChromaDB expects flat embeddings)
flattened_vectors = [item for sublist in vectors for item in sublist]
# Store program data (ID, code, sequence, vectors)
program_id = str(hash(code)) # Use hash of code as ID for uniqueness
collection.add(
documents=[code],
metadatas=[{"sequence": ",".join(sequence), "description_tokens": " ".join(generate_description_tokens(sequence, vectors))}],
ids=[program_id],
embeddings=[flattened_vectors] # Pass as flat list
)
return program_id
def populate_sample_db(client):
"""Populate ChromaDB with sample Python programs."""
samples = [
"""
import os
def add_one(x):
y = x + 1
return y
""",
"""
def multiply(a, b):
c = a * b
if c > 0:
return c
"""
]
for code in samples:
parts, sequence = parse_python_code(code)
vectors = [part['vector'] for part in parts]
store_program(client, code, sequence, vectors)
def query_programs(client, operations, collection_name=DB_NAME, top_k=5, semantic_query=None):
"""Query ChromaDB for programs matching the operations sequence or semantic description."""
collection = create_collection(client, collection_name)
if semantic_query:
# Semantic search using CodeBERT embeddings
query_vector = generate_semantic_vector(semantic_query)
results = collection.query(
query_embeddings=[query_vector],
n_results=top_k,
include=["documents", "metadatas"]
)
else:
# Vector-based search for operations sequence
query_vector = sum([create_vector(op, 0, (1, 1), 100, []) for op in operations], []) / len(operations) if operations else [0] * 6
results = collection.query(
query_embeddings=[query_vector],
n_results=top_k,
include=["documents", "metadatas"]
)
# Process results
matching_programs = []
for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
sequence = meta['sequence'].split(',')
if not semantic_query or is_subsequence(operations, sequence): # Ensure sequence match for operations
try:
# Reconstruct program vectors (flatten if needed)
doc_vectors = eval(doc['vectors']) if isinstance(doc['vectors'], str) else doc['vectors']
if isinstance(doc_vectors, (list, np.ndarray)) and len(doc_vectors) == 6:
program_vector = doc_vectors # Single flat vector
else:
program_vector = np.mean([v for v in doc_vectors if isinstance(v, (list, np.ndarray))], axis=0).tolist()
except:
program_vector = [0] * 6 # Fallback for malformed vectors
similarity = cosine_similarity([query_vector], [program_vector])[0][0] if program_vector and query_vector else 0
matching_programs.append({'id': meta['id'], 'code': doc, 'similarity': similarity, 'description': meta.get('description_tokens', '')})
return sorted(matching_programs, key=lambda x: x['similarity'], reverse=True)
def create_vector(category, level, location, total_lines, parent_path):
"""Helper to create a vector for query (matches parser's create_vector)."""
category_map = {
'import': 1, 'function': 2, 'async_function': 3, 'class': 4,
'if': 5, 'while': 6, 'for': 7, 'try': 8, 'expression': 9, 'spacer': 10,
'other': 11, 'elif': 12, 'else': 13, 'except': 14, 'finally': 15, 'return': 16,
'assigned_variable': 17, 'input_variable': 18, 'returned_variable': 19
}
category_id = category_map.get(category, 0)
start_line, end_line = location
span = (end_line - start_line + 1) / total_lines
center_pos = ((start_line + end_line) / 2) / total_lines
parent_depth = len(parent_path)
parent_weight = sum(category_map.get(parent.split('[')[0].lower(), 0) * (1 / (i + 1))
for i, parent in enumerate(parent_path)) / max(1, len(category_map))
return [category_id, level, center_pos, span, parent_depth, parent_weight]
def is_subsequence(subseq, seq):
"""Check if subseq is a subsequence of seq."""
it = iter(seq)
return all(item in it for item in subseq)
def generate_description_tokens(sequence, vectors):
"""Generate semantic description tokens for a program based on its sequence and vectors."""
tokens = []
category_descriptions = {
'import': 'imports module',
'function': 'defines function',
'assigned_variable': 'assigns variable',
'input_variable': 'input parameter',
'returned_variable': 'returns value',
'if': 'conditional statement',
'return': 'returns result',
'try': 'try block',
'except': 'exception handler',
'expression': 'expression statement',
'spacer': 'empty line or comment'
}
for cat, vec in zip(sequence, vectors):
if cat in category_descriptions:
tokens.append(f"{category_descriptions[cat]}:{cat}")
# Add vector-derived features (e.g., level, span) as tokens
tokens.append(f"level:{vec[1]}")
tokens.append(f"span:{vec[3]:.2f}")
return tokens
def generate_semantic_vector(description, use_gpu=USE_GPU):
"""Generate a semantic vector for a textual description using CodeBERT, with CPU/GPU option."""
# Load CodeBERT model and tokenizer
model_name = "microsoft/codebert-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
device = torch.device("cuda" if use_gpu and torch.cuda.is_available() else "cpu")
model = AutoModel.from_pretrained(model_name).to(device)
# Tokenize and encode the description
inputs = tokenizer(description, return_tensors="pt", padding=True, truncation=True, max_length=512)
inputs = {k: v.to(device) for k, v in inputs.items()}
# Generate embeddings
with torch.no_grad():
outputs = model(**inputs)
# Use mean pooling of the last hidden states
vector = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy().tolist()
# Truncate or pad to 6D to match our vectors
if len(vector) < 6:
vector.extend([0] * (6 - len(vector)))
elif len(vector) > 6:
vector = vector[:6]
return vector
def save_chromadb_to_hf(dataset_name=HF_DATASET_NAME, token=HF_KEY):
"""Save ChromaDB data to Hugging Face Dataset."""
client = init_chromadb()
collection = create_collection(client)
# Fetch all data from ChromaDB
results = collection.get(include=["documents", "metadatas", "embeddings"])
data = {
"code": results["documents"],
"sequence": [meta["sequence"] for meta in results["metadatas"]],
"vectors": results["embeddings"], # ChromaDB already flattens embeddings
"description_tokens": [meta.get('description_tokens', '') for meta in results["metadatas"]]
}
# Create a Hugging Face Dataset
dataset = Dataset.from_dict(data)
# Push to Hugging Face Hub
dataset.push_to_hub(dataset_name, token=token)
print(f"Dataset pushed to Hugging Face Hub as {dataset_name}")
def load_chromadb_from_hf(dataset_name=HF_DATASET_NAME, token=HF_KEY):
"""Load ChromaDB data from Hugging Face Dataset, handle empty dataset."""
try:
dataset = load_dataset(dataset_name, split="train", token=token)
except Exception as e:
print(f"Error loading dataset from Hugging Face: {e}. Populating with samples...")
client = init_chromadb()
populate_sample_db(client)
save_chromadb_to_hf() # Create and push a new dataset
return init_chromadb()
client = init_chromadb()
collection = create_collection(client)
for item in dataset:
collection.add(
documents=[item["code"]],
metadatas=[{"sequence": item["sequence"], "description_tokens": item["description_tokens"]}],
ids=[str(hash(item["code"]))],
embeddings=[item["vectors"]]
)
return client
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
parts = None
filename = 'unnamed.py'
code_input = None
query_results = None
# Handle file upload or pasted code (parsing)
if 'file' in request.files and request.files['file'].filename:
file = request.files['file']
if not file.filename.endswith('.py'):
return 'Invalid file type. Please upload a Python file.', 400
filename = file.filename
file_path = os.path.join(UPLOAD_DIR, filename)
file.save(file_path)
with open(file_path, 'r') as f:
code_input = f.read()
parts, sequence = parse_python_code(code_input)
# Store in ChromaDB
client = init_chromadb()
vectors = [part['vector'] for part in parts]
store_program(client, code_input, sequence, vectors, DB_NAME)
elif 'code' in request.form and request.form['code'].strip():
code_input = request.form['code']
filename = request.form.get('filename', 'unnamed.py') or 'unnamed.py'
if not filename.endswith('.py'):
filename += '.py'
parts, sequence = parse_python_code(code_input)
vectors = [part['vector'] for part in parts]
client = init_chromadb()
store_program(client, code_input, sequence, vectors, DB_NAME)
elif 'query_ops' in request.form and request.form['query_ops'].strip():
# Handle query for operations (category sequence)
operations = [op.strip() for op in request.form['query_ops'].split(',')]
client = load_chromadb_from_hf(HF_DATASET_NAME, HF_KEY) # Load from Hugging Face
query_results = query_programs(client, operations, DB_NAME)
return render_template(
'results_partial.html',
parts=None,
filename=filename,
reconstructed_code=None,
code_input=None,
query_results=query_results
)
elif 'semantic_query' in request.form and request.form['semantic_query'].strip():
# Handle semantic query (natural language description)
semantic_query = request.form['semantic_query']
client = load_chromadb_from_hf(HF_DATASET_NAME, HF_KEY) # Load from Hugging Face
query_results = query_programs(client, None, DB_NAME, semantic_query=semantic_query)
return render_template(
'results_partial.html',
parts=None,
filename=filename,
reconstructed_code=None,
code_input=None,
query_results=query_results
)
elif 'process_hf' in request.form:
# Trigger processing of Hugging Face dataset
try:
subprocess.run(['python', 'process_hf_dataset.py'], check=True)
return render_template(
'results_partial.html',
parts=None,
filename="Hugging Face Dataset Processed",
reconstructed_code=None,
code_input=None,
query_results=None,
message="Hugging Face dataset processed and stored successfully."
)
except subprocess.CalledProcessError as e:
return f"Error processing Hugging Face dataset: {e}", 500
if parts:
indexed_parts = [{'index': i + 1, **part} for i, part in enumerate(parts)]
reconstructed_code = reconstruct_code(indexed_parts)
return render_template(
'results_partial.html',
parts=indexed_parts,
filename=filename,
reconstructed_code=reconstructed_code,
code_input=code_input,
query_results=None
)
return 'No file, code, or query provided', 400
# Initial page load
client = load_chromadb_from_hf(HF_DATASET_NAME, HF_KEY) # Load from Hugging Face on startup
# If no dataset exists locally, populate with samples
try:
if not client.list_collections()[0].name == DB_NAME:
populate_sample_db(client)
except:
populate_sample_db(client)
return render_template('index.html', parts=None, filename=None, reconstructed_code=None, code_input=None, query_results=None)
@app.route('/export_json', methods=['POST'])
def export_json():
parts = request.json.get('parts', [])
export_data = [{'vector': part['vector'], 'source': part['source'], 'description': generate_description_tokens([part['category']], [part['vector']])} for part in parts]
json_str = json.dumps(export_data, indent=2)
buffer = io.BytesIO(json_str.encode('utf-8'))
buffer.seek(0)
return send_file(
buffer,
as_attachment=True,
download_name='code_vectors.json',
mimetype='application/json'
)
if __name__ == '__main__':
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
app.run(host="0.0.0.0", port=7860) # Bind to all interfaces for Hugging Face Spaces