Spaces:
Running
Running
import ast | |
import os | |
import json | |
from typing import Dict, List, Any, Optional, Union | |
from dataclasses import dataclass, asdict | |
from pathlib import Path | |
import hashlib | |
class CodeChunk: | |
"""Represents a chunk of code with metadata""" | |
content: str | |
chunk_type: str # 'function', 'class', 'method', 'import', 'variable', 'comment', 'module' | |
name: str | |
file_path: str | |
start_line: int | |
end_line: int | |
start_col: int | |
end_col: int | |
parent_name: Optional[str] = None | |
docstring: Optional[str] = None | |
parameters: Optional[List[str]] = None | |
return_type: Optional[str] = None | |
decorators: Optional[List[str]] = None | |
complexity_score: Optional[int] = None | |
dependencies: Optional[List[str]] = None | |
chunk_id: Optional[str] = None | |
def __post_init__(self): | |
# Generate unique ID based on content and location | |
content_hash = hashlib.md5( | |
f"{self.file_path}:{self.start_line}:{self.end_line}:{self.content}".encode() | |
).hexdigest()[:8] | |
self.chunk_id = f"{self.chunk_type}_{self.name}_{content_hash}" | |
class CodeChunker: | |
"""Main class for chunking code using AST""" | |
def __init__(self, supported_extensions: List[str] = None): | |
self.supported_extensions = supported_extensions or ['.py', '.js', '.ts', '.java', '.cpp', '.c', '.h'] | |
self.chunks: List[CodeChunk] = [] | |
def chunk_file(self, file_path: str) -> List[CodeChunk]: | |
"""Chunk a single file and return list of CodeChunk objects""" | |
file_path = Path(file_path) | |
if file_path.suffix not in self.supported_extensions: | |
return [] | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
except Exception as e: | |
print(f"Error reading file {file_path}: {e}") | |
return [] | |
if file_path.suffix == '.py': | |
return self._chunk_python_file(str(file_path), content) | |
else: | |
# For other languages, use simpler text-based chunking for now | |
return self._chunk_generic_file(str(file_path), content) | |
def _chunk_python_file(self, file_path: str, content: str) -> List[CodeChunk]: | |
"""Chunk Python file using AST""" | |
chunks = [] | |
lines = content.split('\n') | |
try: | |
tree = ast.parse(content) | |
except SyntaxError as e: | |
print(f"Syntax error in {file_path}: {e}") | |
return [] | |
# Track imports at module level | |
imports = [] | |
for node in ast.walk(tree): | |
if isinstance(node, (ast.Import, ast.ImportFrom)): | |
imports.extend(self._extract_imports(node)) | |
# Process top-level nodes | |
for node in tree.body: | |
chunk = self._process_node(node, file_path, lines, imports) | |
if chunk: | |
chunks.append(chunk) | |
return chunks | |
def _process_node(self, node: ast.AST, file_path: str, lines: List[str], | |
imports: List[str], parent_name: str = None) -> Optional[CodeChunk]: | |
"""Process an AST node and create a CodeChunk""" | |
if isinstance(node, ast.FunctionDef): | |
return self._create_function_chunk(node, file_path, lines, imports, parent_name) | |
elif isinstance(node, ast.AsyncFunctionDef): | |
return self._create_function_chunk(node, file_path, lines, imports, parent_name, is_async=True) | |
elif isinstance(node, ast.ClassDef): | |
return self._create_class_chunk(node, file_path, lines, imports) | |
elif isinstance(node, ast.Assign): | |
return self._create_variable_chunk(node, file_path, lines, parent_name) | |
elif isinstance(node, (ast.Import, ast.ImportFrom)): | |
return self._create_import_chunk(node, file_path, lines) | |
return None | |
def _create_function_chunk(self, node: Union[ast.FunctionDef, ast.AsyncFunctionDef], | |
file_path: str, lines: List[str], imports: List[str], | |
parent_name: str = None, is_async: bool = False) -> CodeChunk: | |
"""Create a chunk for a function or method""" | |
# Extract function content | |
start_line = node.lineno | |
end_line = node.end_lineno or start_line | |
content = '\n'.join(lines[start_line-1:end_line]) | |
# Extract parameters | |
parameters = [] | |
for arg in node.args.args: | |
param_str = arg.arg | |
if arg.annotation: | |
param_str += f": {ast.unparse(arg.annotation)}" | |
parameters.append(param_str) | |
# Extract return type | |
return_type = None | |
if node.returns: | |
return_type = ast.unparse(node.returns) | |
# Extract decorators | |
decorators = [] | |
for decorator in node.decorator_list: | |
decorators.append(ast.unparse(decorator)) | |
# Extract docstring | |
docstring = ast.get_docstring(node) | |
# Calculate complexity (simple metric based on control flow) | |
complexity = self._calculate_complexity(node) | |
chunk_type = "method" if parent_name else "function" | |
if is_async: | |
chunk_type = "async_" + chunk_type | |
return CodeChunk( | |
content=content, | |
chunk_type=chunk_type, | |
name=node.name, | |
file_path=file_path, | |
start_line=start_line, | |
end_line=end_line, | |
start_col=node.col_offset, | |
end_col=node.end_col_offset or 0, | |
parent_name=parent_name, | |
docstring=docstring, | |
parameters=parameters, | |
return_type=return_type, | |
decorators=decorators, | |
complexity_score=complexity, | |
dependencies=imports | |
) | |
def _create_class_chunk(self, node: ast.ClassDef, file_path: str, | |
lines: List[str], imports: List[str]) -> CodeChunk: | |
"""Create a chunk for a class""" | |
start_line = node.lineno | |
end_line = node.end_lineno or start_line | |
content = '\n'.join(lines[start_line-1:end_line]) | |
# Extract base classes | |
base_classes = [] | |
for base in node.bases: | |
base_classes.append(ast.unparse(base)) | |
# Extract decorators | |
decorators = [] | |
for decorator in node.decorator_list: | |
decorators.append(ast.unparse(decorator)) | |
# Extract docstring | |
docstring = ast.get_docstring(node) | |
return CodeChunk( | |
content=content, | |
chunk_type="class", | |
name=node.name, | |
file_path=file_path, | |
start_line=start_line, | |
end_line=end_line, | |
start_col=node.col_offset, | |
end_col=node.end_col_offset or 0, | |
docstring=docstring, | |
decorators=decorators, | |
dependencies=imports + base_classes | |
) | |
def _create_variable_chunk(self, node: ast.Assign, file_path: str, | |
lines: List[str], parent_name: str = None) -> Optional[CodeChunk]: | |
"""Create a chunk for variable assignments""" | |
# Only process simple assignments at module level | |
if len(node.targets) == 1 and isinstance(node.targets[0], ast.Name): | |
var_name = node.targets[0].id | |
start_line = node.lineno | |
end_line = node.end_lineno or start_line | |
content = '\n'.join(lines[start_line-1:end_line]) | |
return CodeChunk( | |
content=content, | |
chunk_type="variable", | |
name=var_name, | |
file_path=file_path, | |
start_line=start_line, | |
end_line=end_line, | |
start_col=node.col_offset, | |
end_col=node.end_col_offset or 0, | |
parent_name=parent_name | |
) | |
return None | |
def _create_import_chunk(self, node: Union[ast.Import, ast.ImportFrom], | |
file_path: str, lines: List[str]) -> CodeChunk: | |
"""Create a chunk for import statements""" | |
start_line = node.lineno | |
end_line = node.end_lineno or start_line | |
content = '\n'.join(lines[start_line-1:end_line]) | |
# Extract imported names | |
imported_names = [] | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
imported_names.append(alias.name) | |
else: # ImportFrom | |
for alias in node.names: | |
imported_names.append(alias.name) | |
return CodeChunk( | |
content=content, | |
chunk_type="import", | |
name=", ".join(imported_names), | |
file_path=file_path, | |
start_line=start_line, | |
end_line=end_line, | |
start_col=node.col_offset, | |
end_col=node.end_col_offset or 0 | |
) | |
def _extract_imports(self, node: Union[ast.Import, ast.ImportFrom]) -> List[str]: | |
"""Extract import names from import nodes""" | |
imports = [] | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
imports.append(alias.name) | |
else: # ImportFrom | |
module = node.module or "" | |
for alias in node.names: | |
imports.append(f"{module}.{alias.name}" if module else alias.name) | |
return imports | |
def _calculate_complexity(self, node: ast.AST) -> int: | |
"""Calculate cyclomatic complexity of a function""" | |
complexity = 1 # Base complexity | |
for child in ast.walk(node): | |
if isinstance(child, (ast.If, ast.While, ast.For, ast.AsyncFor)): | |
complexity += 1 | |
elif isinstance(child, ast.ExceptHandler): | |
complexity += 1 | |
elif isinstance(child, (ast.ListComp, ast.SetComp, ast.DictComp, ast.GeneratorExp)): | |
complexity += 1 | |
return complexity | |
def _chunk_generic_file(self, file_path: str, content: str) -> List[CodeChunk]: | |
"""Generic chunking for non-Python files""" | |
chunks = [] | |
lines = content.split('\n') | |
# Simple function detection for C/C++/Java/JavaScript | |
function_patterns = { | |
'.js': r'function\s+(\w+)', | |
'.ts': r'function\s+(\w+)', | |
'.java': r'(public|private|protected)?\s*(static)?\s*\w+\s+(\w+)\s*\(', | |
'.cpp': r'\w+\s+(\w+)\s*\(', | |
'.c': r'\w+\s+(\w+)\s*\(', | |
'.h': r'\w+\s+(\w+)\s*\(' | |
} | |
# This is a simplified implementation - you'd want more sophisticated parsing | |
# for production use | |
return chunks | |
def chunk_directory(self, directory_path: str, recursive: bool = True) -> List[CodeChunk]: | |
"""Chunk all supported files in a directory""" | |
all_chunks = [] | |
directory_path = Path(directory_path) | |
if recursive: | |
pattern = "**/*" | |
else: | |
pattern = "*" | |
for file_path in directory_path.glob(pattern): | |
if file_path.is_file() and file_path.suffix in self.supported_extensions: | |
chunks = self.chunk_file(str(file_path)) | |
all_chunks.extend(chunks) | |
self.chunks = all_chunks | |
return all_chunks | |
def save_chunks(self, output_file: str): | |
"""Save chunks to JSON file""" | |
chunks_data = [asdict(chunk) for chunk in self.chunks] | |
with open(output_file, 'w', encoding='utf-8') as f: | |
json.dump(chunks_data, f, indent=2, ensure_ascii=False) | |
def load_chunks(self, input_file: str) -> List[CodeChunk]: | |
"""Load chunks from JSON file""" | |
with open(input_file, 'r', encoding='utf-8') as f: | |
chunks_data = json.load(f) | |
self.chunks = [CodeChunk(**chunk_data) for chunk_data in chunks_data] | |
return self.chunks | |
def get_chunks_by_type(self, chunk_type: str) -> List[CodeChunk]: | |
"""Filter chunks by type""" | |
return [chunk for chunk in self.chunks if chunk.chunk_type == chunk_type] | |
def get_chunks_by_file(self, file_path: str) -> List[CodeChunk]: | |
"""Filter chunks by file path""" | |
return [chunk for chunk in self.chunks if chunk.file_path == file_path] | |
def search_chunks(self, query: str) -> List[CodeChunk]: | |
"""Simple text search in chunks""" | |
results = [] | |
query_lower = query.lower() | |
for chunk in self.chunks: | |
if (query_lower in chunk.content.lower() or | |
query_lower in chunk.name.lower() or | |
(chunk.docstring and query_lower in chunk.docstring.lower())): | |
results.append(chunk) | |
return results | |
# Example usage | |
if __name__ == "__main__": | |
# Initialize chunker | |
chunker = CodeChunker() | |
# Example: Chunk a single Python file | |
# chunks = chunker.chunk_file("example.py") | |
# Example: Chunk entire directory | |
chunks = chunker.chunk_directory("ultralytics", recursive=True) | |
# Example: Save chunks to file | |
chunker.save_chunks("code_chunks.json") | |
# Example: Search chunks | |
# results = chunker.search_chunks("database") | |
# Example: Get all functions | |
# functions = chunker.get_chunks_by_type("function") | |
print("Code chunking system initialized!") | |
print("Supported file extensions:", chunker.supported_extensions) | |
print("\nExample usage:") | |
print("1. chunker.chunk_file('path/to/file.py')") | |
print("2. chunker.chunk_directory('path/to/project', recursive=True)") | |
print("3. chunker.save_chunks('output.json')") | |
print("4. chunker.search_chunks('query')") |