| | |
| | """ |
| | Extract individual functions from enhanced_dataset.csv and create a new dataset. |
| | Each function becomes a separate row in the new dataset. |
| | Version 2: Better handling of malformed CSV/JSON |
| | """ |
| |
|
| | import csv |
| | import json |
| | import re |
| | from collections import defaultdict |
| | import sys |
| |
|
| | def clean_json_string(json_str): |
| | """ |
| | Clean up malformed JSON strings that may have been corrupted by CSV formatting. |
| | """ |
| | |
| | |
| | |
| | |
| | json_str = re.sub(r'"\s*function_nam\s*e\s*"', '"function_name"', json_str) |
| | json_str = re.sub(r'"\s*function_start_line\s*"', '"function_start_line"', json_str) |
| | json_str = re.sub(r'"\s*function_end_line\s*"', '"function_end_line"', json_str) |
| | json_str = re.sub(r'"\s*relevance_score\s*"', '"relevance_score"', json_str) |
| | json_str = re.sub(r'"\s*relevance_reason\s*"', '"relevance_reason"', json_str) |
| | json_str = re.sub(r'"\s*doc_start_line\s*"', '"doc_start_line"', json_str) |
| | json_str = re.sub(r'"\s*doc_end_line\s*"', '"doc_end_line"', json_str) |
| | |
| | |
| | json_str = json_str.replace('**', '') |
| | |
| | |
| | json_str = re.sub(r'"\s*([a-z_]+)\s*([a-z_]+)\s*([a-z_]*)\s*":', |
| | lambda m: '"' + m.group(1) + m.group(2) + (m.group(3) if m.group(3) else '') + '":', |
| | json_str) |
| | |
| | return json_str |
| |
|
| |
|
| | def extract_function_content(text, start_line, end_line): |
| | """ |
| | Extract function content from text based on line number range. |
| | |
| | Args: |
| | text: The full code text |
| | start_line: Starting line number (1-indexed) |
| | end_line: Ending line number (1-indexed) |
| | |
| | Returns: |
| | Extracted function content as string |
| | """ |
| | lines = text.split('\n') |
| | |
| | start_idx = max(0, start_line - 1) |
| | end_idx = min(len(lines), end_line) |
| | |
| | function_lines = lines[start_idx:end_idx] |
| | return '\n'.join(function_lines) |
| |
|
| |
|
| | def process_dataset(input_file, output_file): |
| | """ |
| | Process enhanced_dataset.csv and extract functions. |
| | |
| | Args: |
| | input_file: Path to enhanced_dataset.csv |
| | output_file: Path to output CSV file |
| | """ |
| | print(f"Reading from: {input_file}") |
| | print(f"Writing to: {output_file}") |
| | |
| | |
| | total_rows = 0 |
| | total_functions = 0 |
| | score_distribution = defaultdict(int) |
| | skipped_rows = 0 |
| | parse_errors = 0 |
| | empty_function_info = 0 |
| | |
| | with open(input_file, 'r', encoding='utf-8') as infile, \ |
| | open(output_file, 'w', encoding='utf-8', newline='') as outfile: |
| | |
| | reader = csv.DictReader(infile) |
| | |
| | |
| | fieldnames = [ |
| | 'original_index', |
| | 'function_index', |
| | 'repo_name', |
| | 'path', |
| | 'language', |
| | 'license', |
| | 'keyword', |
| | 'text_hash', |
| | 'config', |
| | 'split', |
| | 'repo_path', |
| | 'ds_source', |
| | 'function_name', |
| | 'function_start_line', |
| | 'function_end_line', |
| | 'doc_start_line', |
| | 'doc_end_line', |
| | 'relevance_score', |
| | 'relevance_reason', |
| | 'function_content' |
| | ] |
| | |
| | writer = csv.DictWriter(outfile, fieldnames=fieldnames) |
| | writer.writeheader() |
| | |
| | |
| | all_function_rows = [] |
| | |
| | print("\nProcessing rows...") |
| | for row in reader: |
| | total_rows += 1 |
| | |
| | if total_rows % 1000 == 0: |
| | print(f"Processed {total_rows} rows, extracted {total_functions} functions, errors: {parse_errors}...", end='\r') |
| | |
| | |
| | function_info_str = row.get('function_info', '[]') |
| | if not function_info_str or function_info_str.strip() == '': |
| | empty_function_info += 1 |
| | skipped_rows += 1 |
| | continue |
| | |
| | |
| | function_info_str = clean_json_string(function_info_str) |
| | |
| | |
| | try: |
| | |
| | function_info_list = json.loads(function_info_str) |
| | except (json.JSONDecodeError, ValueError) as e: |
| | |
| | try: |
| | import ast |
| | function_info_list = ast.literal_eval(function_info_str) |
| | except: |
| | |
| | parse_errors += 1 |
| | if parse_errors <= 5: |
| | print(f"\nWarning: Failed to parse function_info in row {total_rows}") |
| | if parse_errors == 5: |
| | print("(Suppressing further parse error messages...)") |
| | skipped_rows += 1 |
| | continue |
| | |
| | |
| | if not isinstance(function_info_list, list): |
| | skipped_rows += 1 |
| | continue |
| | |
| | |
| | text = row.get('text', '') |
| | |
| | |
| | for func_idx, func_info in enumerate(function_info_list): |
| | |
| | if not isinstance(func_info, dict): |
| | continue |
| | |
| | |
| | start_line = func_info.get('function_start_line', 0) |
| | end_line = func_info.get('function_end_line', 0) |
| | |
| | |
| | try: |
| | start_line = int(start_line) if start_line else 0 |
| | end_line = int(end_line) if end_line else 0 |
| | except (ValueError, TypeError): |
| | start_line = 0 |
| | end_line = 0 |
| | |
| | if start_line > 0 and end_line > 0: |
| | function_content = extract_function_content(text, start_line, end_line) |
| | else: |
| | function_content = "" |
| | |
| | |
| | relevance_score = func_info.get('relevance_score', 0) |
| | |
| | |
| | try: |
| | relevance_score = int(relevance_score) if relevance_score else 0 |
| | except (ValueError, TypeError): |
| | relevance_score = 0 |
| | |
| | |
| | score_bucket = (relevance_score // 10) * 10 |
| | score_distribution[score_bucket] += 1 |
| | |
| | |
| | new_row = { |
| | 'original_index': row.get('Unnamed: 0', row.get('Unnamed: 0.1', total_rows - 1)), |
| | 'function_index': func_idx, |
| | 'repo_name': row.get('repo_name', ''), |
| | 'path': row.get('path', ''), |
| | 'language': row.get('language', ''), |
| | 'license': row.get('license', ''), |
| | 'keyword': row.get('keyword', ''), |
| | 'text_hash': row.get('text_hash', ''), |
| | 'config': row.get('config', ''), |
| | 'split': row.get('split', ''), |
| | 'repo_path': row.get('repo_path', ''), |
| | 'ds_source': row.get('ds_source', ''), |
| | 'function_name': func_info.get('function_name', ''), |
| | 'function_start_line': start_line, |
| | 'function_end_line': end_line, |
| | 'doc_start_line': func_info.get('doc_start_line', ''), |
| | 'doc_end_line': func_info.get('doc_end_line', ''), |
| | 'relevance_score': relevance_score, |
| | 'relevance_reason': func_info.get('relevance_reason', ''), |
| | 'function_content': function_content |
| | } |
| | |
| | all_function_rows.append(new_row) |
| | total_functions += 1 |
| | |
| | print(f"\n\nTotal rows processed: {total_rows}") |
| | print(f"Total functions extracted: {total_functions}") |
| | print(f"Skipped rows:") |
| | print(f" - Empty function_info: {empty_function_info}") |
| | print(f" - Parse errors: {parse_errors}") |
| | print(f" - Total skipped: {skipped_rows}") |
| | |
| | |
| | print("\nSorting by relevance score...") |
| | all_function_rows.sort(key=lambda x: x['relevance_score'], reverse=True) |
| | |
| | |
| | print("Writing sorted data to output file...") |
| | for row in all_function_rows: |
| | writer.writerow(row) |
| | |
| | print(f"\nSuccessfully written {total_functions} functions to {output_file}") |
| | |
| | |
| | print("\n" + "="*70) |
| | print("SCORE DISTRIBUTION") |
| | print("="*70) |
| | print(f"{'Score Range':<15} {'Count':<12} {'Percentage':<12} {'Visualization'}") |
| | print("-"*70) |
| | |
| | |
| | sorted_scores = sorted(score_distribution.items(), reverse=True) |
| | |
| | |
| | normal_scores = [(k, v) for k, v in sorted_scores if k >= 0] |
| | anomalous_scores = [(k, v) for k, v in sorted_scores if k < 0] |
| | |
| | for score_bucket, count in normal_scores: |
| | percentage = (count / total_functions * 100) if total_functions > 0 else 0 |
| | bar = '█' * min(50, int(percentage / 2)) |
| | print(f"{score_bucket:>3}-{score_bucket+9:<9} {count:<12} {percentage:>6.2f}% {bar}") |
| | |
| | if anomalous_scores: |
| | print("\nAnomalous scores (negative or out of range):") |
| | for score_bucket, count in anomalous_scores: |
| | percentage = (count / total_functions * 100) if total_functions > 0 else 0 |
| | print(f"{score_bucket:>15} {count:<12} {percentage:>6.2f}%") |
| | |
| | print("-"*70) |
| | print(f"{'Total':<15} {total_functions:<12} {'100.00%':<12}") |
| | print("="*70) |
| | |
| | |
| | if total_functions > 0: |
| | |
| | valid_scores = [row['relevance_score'] for row in all_function_rows |
| | if 0 <= row['relevance_score'] <= 100] |
| | |
| | if valid_scores: |
| | avg_score = sum(valid_scores) / len(valid_scores) |
| | max_score = max(valid_scores) |
| | min_score = min(valid_scores) |
| | |
| | print(f"\nScore Statistics (valid scores 0-100 only):") |
| | print(f" Average Score: {avg_score:.2f}") |
| | print(f" Maximum Score: {max_score}") |
| | print(f" Minimum Score: {min_score}") |
| | print(f" Valid Functions: {len(valid_scores)} / {total_functions}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | input_file = "enhanced_dataset.csv" |
| | output_file = "function_dataset_v2.csv" |
| | |
| | |
| | if len(sys.argv) > 1: |
| | input_file = sys.argv[1] |
| | if len(sys.argv) > 2: |
| | output_file = sys.argv[2] |
| | |
| | try: |
| | process_dataset(input_file, output_file) |
| | print("\n✅ Processing complete!") |
| | except FileNotFoundError: |
| | print(f"❌ Error: File '{input_file}' not found.") |
| | sys.exit(1) |
| | except Exception as e: |
| | print(f"❌ Error: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | sys.exit(1) |
| |
|