|
import ast |
|
from langchain.schema import Document |
|
|
|
def chunk_python_code_with_metadata(source_code, reference): |
|
""" |
|
Entry point method to process the Python file. |
|
It invokes the iterate_ast function. |
|
""" |
|
documents = [] |
|
print(f"Processing file: {reference}") |
|
|
|
iterate_ast(source_code, documents, reference) |
|
|
|
|
|
if reference.startswith("kadi_apy/lib/"): |
|
usage = "library" |
|
elif reference.startswith("kadi_apy/cli/"): |
|
usage = "cli_library" |
|
elif reference.startswith("doc/"): |
|
usage = "doc" |
|
else: |
|
usage = "undefined" |
|
|
|
|
|
for doc in documents: |
|
doc.metadata["reference"] = reference |
|
doc.metadata["usage"] = usage |
|
|
|
return documents |
|
|
|
|
|
def iterate_ast(source_code, documents, reference): |
|
""" |
|
Parses the AST of the given Python file and delegates |
|
handling to specific methods based on node types. |
|
""" |
|
|
|
tree = ast.parse(source_code, filename=reference) |
|
|
|
|
|
imports_dict = extract_imports(tree) |
|
|
|
first_level_nodes = list(ast.iter_child_nodes(tree)) |
|
|
|
|
|
if not first_level_nodes: |
|
handle_no_first_level_node_found(documents, source_code, imports_dict, reference) |
|
return |
|
|
|
|
|
all_imports = all(isinstance(node, (ast.Import, ast.ImportFrom)) for node in first_level_nodes) |
|
if all_imports: |
|
handle_first_level_imports_only(documents, source_code, imports_dict, reference) |
|
|
|
|
|
|
|
for first_level_node in ast.iter_child_nodes(tree): |
|
if isinstance(first_level_node, ast.ClassDef): |
|
handle_first_level_class(first_level_node, documents, source_code, imports_dict) |
|
elif isinstance(first_level_node, ast.FunctionDef): |
|
handle_first_level_func(first_level_node, documents, source_code, imports_dict) |
|
elif isinstance(first_level_node, ast.Assign): |
|
handle_first_level_assign(first_level_node, documents, source_code, imports_dict) |
|
|
|
|
|
|
|
def handle_first_level_imports_only(documents, source_code, imports_dict, reference): |
|
|
|
if reference.endswith("__init__.py"): |
|
type = "__init__-file" |
|
else: |
|
type = "undefined" |
|
|
|
|
|
doc = Document( |
|
page_content=source_code, |
|
metadata={ |
|
"type": type, |
|
"imports": imports_dict |
|
} |
|
) |
|
documents.append(doc) |
|
|
|
|
|
def extract_imports(tree): |
|
""" |
|
Extracts all import statements from the AST tree and organizes them |
|
into a dictionary keyed by their fully qualified names for later analysis. |
|
""" |
|
imports_dict = {} |
|
for node in ast.walk(tree): |
|
if isinstance(node, ast.Import): |
|
for alias in node.names: |
|
imports_dict[alias.name] = alias.name |
|
elif isinstance(node, ast.ImportFrom): |
|
module = node.module if node.module else "" |
|
for alias in node.names: |
|
full_name = f"{module}.{alias.name}" if module else alias.name |
|
imports_dict[alias.name] = full_name |
|
return imports_dict |
|
|
|
def analyze_imports(node, imports_dict): |
|
""" |
|
Analyzes the given node's body and signature to find relevant imports. |
|
""" |
|
relevant_imports = set() |
|
for sub_node in ast.walk(node): |
|
if isinstance(sub_node, ast.Name) and sub_node.id in imports_dict: |
|
relevant_imports.add(imports_dict[sub_node.id]) |
|
return list(relevant_imports) |
|
|
|
def handle_not_yet_defined_first_level_cases(documents, source_code, imports_dict): |
|
if source_code: |
|
doc = Document( |
|
page_content=source_code, |
|
metadata={ |
|
"type": "undefined", |
|
"imports": imports_dict |
|
} |
|
) |
|
documents.append(doc) |
|
|
|
|
|
|
|
def handle_no_first_level_node_found(documents, source_code, imports_dict, reference): |
|
""" |
|
Handles cases where no top-level nodes are found in the AST. |
|
Stores the full content (likely comments) in a Document object |
|
with metadata indicating type 'no code' or 'init' based on the reference. |
|
""" |
|
|
|
if reference.endswith("__init__.py"): |
|
type = "__init__-file" |
|
else: |
|
type = "undefined" |
|
|
|
|
|
doc = Document( |
|
page_content=source_code, |
|
metadata={ |
|
"type": type, |
|
"imports": imports_dict |
|
} |
|
) |
|
documents.append(doc) |
|
|
|
|
|
def handle_first_level_assign(assign_node, documents, source_code, imports_dict): |
|
""" |
|
Handles assignment statements at the first level of the AST by storing them |
|
in a Document object with metadata, including relevant imports. |
|
""" |
|
|
|
assign_start_line = assign_node.lineno |
|
assign_end_line = assign_node.end_lineno |
|
assign_source = '\n'.join(source_code.splitlines()[assign_start_line-1:assign_end_line]) |
|
|
|
|
|
assign_imports = analyze_imports(assign_node, imports_dict) |
|
|
|
|
|
doc = Document( |
|
page_content=assign_source, |
|
metadata={ |
|
"type": "Assign", |
|
"imports": assign_imports |
|
} |
|
) |
|
documents.append(doc) |
|
|
|
def handle_first_level_class(class_node, documents, source_code, imports_dict): |
|
|
|
""" |
|
Handles classes at the first level of the AST by storing them |
|
in a Document object with metadata, including relevant imports. |
|
""" |
|
|
|
class_start_line = class_node.lineno |
|
|
|
|
|
class_body_lines = [child.lineno for child in class_node.body if isinstance(child, ast.FunctionDef)] |
|
class_end_line = min(class_body_lines, default=class_node.end_lineno) - 1 |
|
|
|
|
|
class_source = '\n'.join(source_code.splitlines()[class_start_line-1:class_end_line]) |
|
|
|
|
|
class_imports = analyze_imports(class_node, imports_dict) |
|
|
|
|
|
doc = Document( |
|
page_content=class_source, |
|
metadata={ |
|
"type": "class", |
|
"class": class_node.name, |
|
"visibility": "public", |
|
"imports": class_imports, |
|
} |
|
) |
|
documents.append(doc) |
|
|
|
|
|
|
|
for second_level_node in ast.iter_child_nodes(class_node): |
|
if isinstance(second_level_node, ast.FunctionDef): |
|
|
|
method_start_line = ( |
|
second_level_node.decorator_list[0].lineno |
|
if second_level_node.decorator_list else second_level_node.lineno |
|
) |
|
method_end_line = second_level_node.end_lineno |
|
method_source = '\n'.join(source_code.splitlines()[method_start_line-1:method_end_line]) |
|
|
|
|
|
visibility = "internal" if second_level_node.name.startswith("_") else "public" |
|
|
|
method_imports = analyze_imports(second_level_node, imports_dict) |
|
|
|
|
|
|
|
doc = Document( |
|
page_content=method_source, |
|
metadata={ |
|
"type": "method", |
|
"method": second_level_node.name, |
|
"visibility": "visibility", |
|
"imports": method_imports, |
|
"class": class_node.name |
|
} |
|
) |
|
documents.append(doc) |
|
|
|
def handle_first_level_func(function_node, documents, source_code, imports_dict): |
|
""" |
|
Handles functions at the first level of the AST by storing them |
|
in a Document object with metadata, including relevant imports. |
|
""" |
|
|
|
function_start_line = ( |
|
function_node.decorator_list[0].lineno |
|
if function_node.decorator_list else function_node.lineno |
|
) |
|
function_end_line = function_node.end_lineno |
|
function_source = '\n'.join(source_code.splitlines()[function_start_line-1:function_end_line]) |
|
|
|
|
|
visibility = "internal" if function_node.name.startswith("_") else "public" |
|
|
|
|
|
is_command = any( |
|
decorator.id == "apy_command" |
|
for decorator in function_node.decorator_list |
|
if hasattr(decorator, "id") |
|
) |
|
|
|
|
|
function_imports = analyze_imports(function_node, imports_dict) |
|
|
|
|
|
|
|
if is_command: |
|
doc = Document( |
|
page_content=function_source, |
|
metadata={ |
|
"type": "command", |
|
"command": function_node.name, |
|
"visibility": "public", |
|
"imports": function_imports |
|
} |
|
) |
|
else: |
|
doc = Document( |
|
page_content=function_source, |
|
metadata={ |
|
"type": "function", |
|
"method": function_node.name, |
|
"visibility": visibility, |
|
"imports": function_imports |
|
} |
|
) |
|
documents.append(doc) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
|
def process_folder(folder_path): |
|
|
|
python_file_count = 0 |
|
docsT = [] |
|
|
|
for root, _, files in os.walk(folder_path): |
|
for file_name in files: |
|
|
|
file_path = os.path.join(root, file_name) |
|
|
|
|
|
|
|
if file_name.endswith(".py"): |
|
python_file_count += 1 |
|
with open(file_path, "r", encoding="utf-8") as file: |
|
source_code = file.read() |
|
print(file_name) |
|
|
|
|
|
docs = chunkPythonFiles(source_code, file_path) |
|
|
|
print("HWHWHWWHWHWHWH!:" ,len(docs)) |
|
docsT.extend(docs) |
|
|
|
print(f"Total Python files processed: {python_file_count}") |
|
print(f"Total docs files processed: {len(docsT)}") |
|
|
|
|