parse_py / database.py
broadfield-dev's picture
Update database.py
64b5eaa verified
raw
history blame
6.79 kB
# database.py
import chromadb
from parser import parse_python_code
import os
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
# User-configurable variables
DB_NAME = "python_programs" # ChromaDB collection name
HF_DATASET_NAME = "python_program_vectors" # Hugging Face Dataset name
HF_TOKEN = "YOUR_HUGGINGFACE_TOKEN" # Replace with your Hugging Face API token
PERSIST_DIR = "./chroma_data" # Directory for persistent storage (optional)
def init_chromadb(persist_dir=PERSIST_DIR):
"""Initialize ChromaDB client, optionally with persistent storage."""
try:
# Use persistent storage if directory exists, otherwise in-memory
if os.path.exists(persist_dir):
client = chromadb.PersistentClient(path=persist_dir)
else:
client = chromadb.Client()
return client
except Exception as e:
print(f"Error initializing ChromaDB: {e}")
return chromadb.Client() # Fallback to in-memory
def create_collection(client, collection_name=DB_NAME):
"""Create or get a ChromaDB collection for Python programs."""
try:
collection = client.get_collection(name=collection_name)
except:
collection = client.create_collection(name=collection_name)
return collection
def store_program(client, code, sequence, vectors, collection_name=DB_NAME):
"""Store a program in ChromaDB with its code, sequence, and vectors."""
collection = create_collection(client, collection_name)
# Flatten vectors to ensure they are a list of numbers
flattened_vectors = [item for sublist in vectors for item in sublist]
# Store program data (ID, code, sequence, vectors)
program_id = str(hash(code)) # Use hash of code as ID for uniqueness
collection.add(
documents=[code],
metadatas=[{"sequence": ",".join(sequence)}],
ids=[program_id],
embeddings=[flattened_vectors] # Pass as flat list
)
return program_id
def populate_sample_db(client):
"""Populate ChromaDB with sample Python programs."""
samples = [
"""
import os
def add_one(x):
y = x + 1
return y
""",
"""
def multiply(a, b):
c = a * b
if c > 0:
return c
"""
]
for code in samples:
parts, sequence = parse_python_code(code)
vectors = [part['vector'] for part in parts]
store_program(client, code, sequence, vectors)
def query_programs(client, operations, collection_name=DB_NAME, top_k=5):
"""Query ChromaDB for programs matching the operations sequence."""
collection = create_collection(client, collection_name)
# Convert operations to a query vector (average of operation vectors)
query_vector = sum([create_vector(op, 0, (1, 1), 100, []) for op in operations], []) / len(operations) if operations else [0] * 6
# Perform similarity search
results = collection.query(
query_embeddings=[query_vector],
n_results=top_k,
include=["documents", "metadatas"]
)
# Process results
matching_programs = []
for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
sequence = meta['sequence'].split(',')
if is_subsequence(operations, sequence):
# Extract and flatten vectors from the document (assuming stored as string or list)
try:
doc_vectors = eval(doc['vectors']) if isinstance(doc['vectors'], str) else doc['vectors']
program_vector = np.mean([v for v in doc_vectors if isinstance(v, (list, np.ndarray))], axis=0).tolist()
except:
program_vector = [0] * 6 # Fallback for malformed vectors
similarity = cosine_similarity([query_vector], [program_vector])[0][0] if program_vector and query_vector else 0
matching_programs.append({'id': meta['id'], 'code': doc, 'similarity': similarity})
return sorted(matching_programs, key=lambda x: x['similarity'], reverse=True)
def create_vector(category, level, location, total_lines, parent_path):
"""Helper to create a vector for query (matches parser's create_vector)."""
category_map = {
'import': 1, 'function': 2, 'async_function': 3, 'class': 4,
'if': 5, 'while': 6, 'for': 7, 'try': 8, 'expression': 9, 'spacer': 10,
'other': 11, 'elif': 12, 'else': 13, 'except': 14, 'finally': 15, 'return': 16,
'assigned_variable': 17, 'input_variable': 18, 'returned_variable': 19
}
category_id = category_map.get(category, 0)
start_line, end_line = location
span = (end_line - start_line + 1) / total_lines
center_pos = ((start_line + end_line) / 2) / total_lines
parent_depth = len(parent_path)
parent_weight = sum(category_map.get(parent.split('[')[0].lower(), 0) * (1 / (i + 1))
for i, parent in enumerate(parent_path)) / max(1, len(category_map))
return [category_id, level, center_pos, span, parent_depth, parent_weight]
def is_subsequence(subseq, seq):
"""Check if subseq is a subsequence of seq."""
it = iter(seq)
return all(item in it for item in subseq)
def save_chromadb_to_hf(dataset_name=HF_DATASET_NAME, token=HF_TOKEN):
"""Save ChromaDB data to Hugging Face Dataset."""
from datasets import Dataset
client = init_chromadb()
collection = create_collection(client)
# Fetch all data from ChromaDB
results = collection.get(include=["documents", "metadatas", "embeddings"])
data = {
"code": results["documents"],
"sequence": [meta["sequence"] for meta in results["metadatas"]],
"vectors": [[item for sublist in vec for item in sublist] for vec in results["embeddings"]] # Flatten vectors
}
# Create a Hugging Face Dataset
dataset = Dataset.from_dict(data)
# Push to Hugging Face Hub
dataset.push_to_hub(dataset_name, token=token)
print(f"Dataset pushed to Hugging Face Hub as {dataset_name}")
def load_chromadb_from_hf(dataset_name=HF_DATASET_NAME, token=HF_TOKEN):
"""Load ChromaDB data from Hugging Face Dataset."""
from datasets import load_dataset
client = init_chromadb()
collection = create_collection(client)
dataset = load_dataset(dataset_name, split="train", token=token)
for item in dataset:
collection.add(
documents=[item["code"]],
metadatas=[{"sequence": item["sequence"]}],
ids=[str(hash(item["code"]))],
embeddings=[item["vectors"]]
)
return client
if __name__ == '__main__':
client = init_chromadb()
populate_sample_db(client)
# Uncomment to save to Hugging Face
# save_chromadb_to_hf()