KadiAPY_Coding_Assistant / chunk_python_code
bupa1018's picture
Rename process_python_code to chunk_python_code
3292d2b
raw
history blame
11.2 kB
import ast
from langchain.schema import Document
def chunk_python_code_with_metadata(source_code, reference):
"""
Entry point method to process the Python file.
It invokes the iterate_ast function.
"""
documents = []
print(f"Processing file: {reference}")
iterate_ast(source_code, documents, reference)
# Determine usage based on the reference path
if reference.startswith("kadi_apy/lib/"):
usage = "library"
elif reference.startswith("kadi_apy/cli/"):
usage = "cli_library"
elif reference.startswith("doc/"):
usage = "doc"
else:
usage = "undefined"
# Add metadata for usage to all documents
for doc in documents:
doc.metadata["reference"] = reference
doc.metadata["usage"] = usage # Add the determined usage metadata
return documents
def iterate_ast(source_code, documents, reference):
"""
Parses the AST of the given Python file and delegates
handling to specific methods based on node types.
"""
# Parse the source code into an abstract syntax tree (AST)
tree = ast.parse(source_code, filename=reference)
# Gather all top-level imports for later use
imports_dict = extract_imports(tree)
first_level_nodes = list(ast.iter_child_nodes(tree))
# Check if there are no first-level nodes
if not first_level_nodes:
handle_no_first_level_node_found(documents, source_code, imports_dict, reference)
return
all_imports = all(isinstance(node, (ast.Import, ast.ImportFrom)) for node in first_level_nodes)
if all_imports:
handle_first_level_imports_only(documents, source_code, imports_dict, reference)
# Iterate over first-level nodes
for first_level_node in ast.iter_child_nodes(tree):
if isinstance(first_level_node, ast.ClassDef):
handle_first_level_class(first_level_node, documents, source_code, imports_dict)
elif isinstance(first_level_node, ast.FunctionDef):
handle_first_level_func(first_level_node, documents, source_code, imports_dict)
elif isinstance(first_level_node, ast.Assign):
handle_first_level_assign(first_level_node, documents, source_code, imports_dict)
def handle_first_level_imports_only(documents, source_code, imports_dict, reference):
# Check if the file path before ".py" is "__init__"
if reference.endswith("__init__.py"):
type = "__init__-file"
else:
type = "undefined"
# Create and store a Document with the full source code
doc = Document(
page_content=source_code,
metadata={
"type": type,
"imports": imports_dict
}
)
documents.append(doc)
def extract_imports(tree):
"""
Extracts all import statements from the AST tree and organizes them
into a dictionary keyed by their fully qualified names for later analysis.
"""
imports_dict = {}
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports_dict[alias.name] = alias.name
elif isinstance(node, ast.ImportFrom):
module = node.module if node.module else ""
for alias in node.names:
full_name = f"{module}.{alias.name}" if module else alias.name
imports_dict[alias.name] = full_name
return imports_dict
def analyze_imports(node, imports_dict):
"""
Analyzes the given node's body and signature to find relevant imports.
"""
relevant_imports = set()
for sub_node in ast.walk(node):
if isinstance(sub_node, ast.Name) and sub_node.id in imports_dict:
relevant_imports.add(imports_dict[sub_node.id])
return list(relevant_imports)
def handle_not_yet_defined_first_level_cases(documents, source_code, imports_dict):
if source_code:
doc = Document(
page_content=source_code,
metadata={
"type": "undefined",
"imports": imports_dict
}
)
documents.append(doc)
def handle_no_first_level_node_found(documents, source_code, imports_dict, reference):
"""
Handles cases where no top-level nodes are found in the AST.
Stores the full content (likely comments) in a Document object
with metadata indicating type 'no code' or 'init' based on the reference.
"""
# Check if the file path before ".py" is "__init__"
if reference.endswith("__init__.py"):
type = "__init__-file"
else:
type = "undefined"
# Create and store a Document with the full source code
doc = Document(
page_content=source_code,
metadata={
"type": type,
"imports": imports_dict
}
)
documents.append(doc)
def handle_first_level_assign(assign_node, documents, source_code, imports_dict):
"""
Handles assignment statements at the first level of the AST by storing them
in a Document object with metadata, including relevant imports.
"""
# Extract assignment source code
assign_start_line = assign_node.lineno
assign_end_line = assign_node.end_lineno
assign_source = '\n'.join(source_code.splitlines()[assign_start_line-1:assign_end_line])
# Extract relevant imports for this assignment
assign_imports = analyze_imports(assign_node, imports_dict)
# Create and store Document for the assignment
doc = Document(
page_content=assign_source,
metadata={
"type": "Assign",
"imports": assign_imports
}
)
documents.append(doc)
def handle_first_level_class(class_node, documents, source_code, imports_dict):
"""
Handles classes at the first level of the AST by storing them
in a Document object with metadata, including relevant imports.
"""
# Extract class source code
class_start_line = class_node.lineno
# Find the line where the first function (def) starts or the next top-level node
class_body_lines = [child.lineno for child in class_node.body if isinstance(child, ast.FunctionDef)]
class_end_line = min(class_body_lines, default=class_node.end_lineno) - 1 # Use `-1` to exclude the next node
# Generate the class source code
class_source = '\n'.join(source_code.splitlines()[class_start_line-1:class_end_line])
# Extract relevant imports for this class
class_imports = analyze_imports(class_node, imports_dict)
# Create and store Document for the class
doc = Document(
page_content=class_source,
metadata={
"type": "class",
"class": class_node.name,
"visibility": "public",
"imports": class_imports,
}
)
documents.append(doc)
# Handle methods within the class
for second_level_node in ast.iter_child_nodes(class_node):
if isinstance(second_level_node, ast.FunctionDef):
# Extract method source code
method_start_line = (
second_level_node.decorator_list[0].lineno
if second_level_node.decorator_list else second_level_node.lineno
)
method_end_line = second_level_node.end_lineno
method_source = '\n'.join(source_code.splitlines()[method_start_line-1:method_end_line])
# Determine visibility metadata
visibility = "internal" if second_level_node.name.startswith("_") else "public"
# Extract relevant imports for this method
method_imports = analyze_imports(second_level_node, imports_dict)
# Create and store Document
doc = Document(
page_content=method_source,
metadata={
"type": "method",
"method": second_level_node.name,
"visibility": "visibility",
"imports": method_imports,
"class": class_node.name
}
)
documents.append(doc)
def handle_first_level_func(function_node, documents, source_code, imports_dict):
"""
Handles functions at the first level of the AST by storing them
in a Document object with metadata, including relevant imports.
"""
# Extract function source code
function_start_line = (
function_node.decorator_list[0].lineno
if function_node.decorator_list else function_node.lineno
)
function_end_line = function_node.end_lineno
function_source = '\n'.join(source_code.splitlines()[function_start_line-1:function_end_line])
# Determine visibility metadata
visibility = "internal" if function_node.name.startswith("_") else "public"
# Check if the function is a CLI command (e.g., decorated with `@apy_command`)
is_command = any(
decorator.id == "apy_command" # Check decorator name
for decorator in function_node.decorator_list
if hasattr(decorator, "id") # Ensure the decorator has an identifier
)
# Extract relevant imports for this function
function_imports = analyze_imports(function_node, imports_dict)
# Create and store Document
if is_command:
doc = Document(
page_content=function_source,
metadata={
"type": "command",
"command": function_node.name,
"visibility": "public",
"imports": function_imports
}
)
else:
doc = Document(
page_content=function_source,
metadata={
"type": "function",
"method": function_node.name,
"visibility": visibility,
"imports": function_imports
}
)
documents.append(doc)
# Example usage
#file_path = r"C:\Users\Anwender\Downloads\exampleScript.py"
#with open(file_path, "r", encoding="utf-8") as file:
# source_code = file.read()
#chunkPythonFiles(source_code, file_path)
import os
def process_folder(folder_path):
# Initialize a counter for the number of Python files
python_file_count = 0
docsT = []
# Walk through all subdirectories and files in the folder
for root, _, files in os.walk(folder_path):
for file_name in files:
# Create the full file path
file_path = os.path.join(root, file_name)
#print(file_path)
# Ensure it's a Python file
if file_name.endswith(".py"):
python_file_count += 1 # Increment the counter
with open(file_path, "r", encoding="utf-8") as file:
source_code = file.read()
print(file_name)
# Call your function
docs = chunkPythonFiles(source_code, file_path)
print("HWHWHWWHWHWHWH!:" ,len(docs))
docsT.extend(docs)
# Print the total number of Python files processed
print(f"Total Python files processed: {python_file_count}")
print(f"Total docs files processed: {len(docsT)}")