| """ |
| Repository File Type Chunker - Universal chunker for all file types. |
| |
| This module provides file-type-aware chunking for repositories, handling |
| everything from Python code to configuration files, documentation, and |
| special files. It's the universal interface that delegates to specialized |
| chunkers based on file type. |
| |
| ARCHITECTURE POSITION: |
| - File Type Dispatcher: Routes files to appropriate chunkers |
| - Universal Interface: Single entry point for all file types |
| - Metadata Enricher: Adds repository context to all chunks |
| |
| KEY FEATURES: |
| 1. File type detection and intelligent routing |
| 2. Hierarchical chunking for Python files |
| 3. Documentation chunking for markdown/RST |
| 4. Configuration file handling (JSON/YAML/TOML) |
| 5. Special file handling (README, requirements.txt, Dockerfile) |
| 6. Binary file detection and skipping |
| |
| FILE TYPE SUPPORT: |
| - .py: HierarchicalChunker (AST + Tree-sitter) |
| - .md/.mdx/.rst: Documentation chunker |
| - .json/.yaml/.toml: Configuration chunker |
| - requirements.txt/Dockerfile: Special chunker |
| - .txt/.cfg/.ini: Text chunker |
| - README/LICENSE: Documentation chunker |
| - Others: Text chunker with binary detection |
| |
| DATA FLOW: |
| File → Type detection → Route to specialized chunker → |
| Add repo metadata → Return chunks |
| |
| USAGE: |
| chunker = RepoChunker() |
| chunks = chunker.chunk_file(Path("file.py"), repo_metadata) |
| """ |
|
|
| from pathlib import Path |
| from typing import List, Dict, Optional, cast |
| import json |
| import yaml |
| import re |
| import hashlib |
| from .hierarchical_chunker import HierarchicalChunker |
| from .chunk_schema import CodeChunk, ChunkAST, ChunkSpan, ChunkHierarchy, ChunkType, ASTSymbolType |
| from .doc_chunker import chunk_document as chunk_markdown_file |
|
|
|
|
| class RepoChunker: |
| """ |
| Repository chunker that handles ALL file types with proper structure |
| """ |
| |
| def __init__(self, use_hierarchical: bool = True): |
| if use_hierarchical: |
| self.hierarchical_chunker = HierarchicalChunker() |
| self.use_hierarchical = use_hierarchical |
| |
| def _generate_stable_id(self, content: str, prefix: str = "stable") -> str: |
| """ |
| Generate deterministic chunk ID using SHA256. |
| |
| IMPORTANT: This ensures IDs are stable across runs, processes, |
| and Python versions - crucial for RAG reproducibility. |
| |
| Args: |
| content: The text content to hash |
| prefix: ID prefix (config, doc, text, etc.) |
| |
| Returns: |
| Deterministic ID like "config_8a3b2c1d" |
| """ |
| |
| hash_digest = hashlib.sha256(content.encode("utf-8")).hexdigest()[:8] |
| return f"{prefix}_{hash_digest}" |
| |
| def chunk_file(self, file_path: Path, repo_metadata: Optional[Dict] = None) -> List[CodeChunk]: |
| """ |
| Chunk ANY file type with repository context |
| |
| Args: |
| file_path: Path to the file |
| repo_metadata: Optional dict with repo metadata |
| """ |
| suffix = file_path.suffix.lower() |
| |
| |
| if suffix == '.py': |
| return self._chunk_python_file(file_path, repo_metadata) |
| |
| |
| elif suffix in ['.md', '.mdx', '.rst']: |
| return self._chunk_markdown_file_wrapper(file_path, repo_metadata) |
| |
| |
| elif suffix == '.json': |
| return self._chunk_json_file(file_path, repo_metadata) |
| |
| |
| elif suffix in ['.yaml', '.yml', '.toml']: |
| return self._chunk_config_file(file_path, repo_metadata) |
| |
| |
| elif file_path.name.lower() in ['requirements.txt', 'dockerfile', 'docker-compose.yml']: |
| return self._chunk_special_file(file_path, repo_metadata) |
| |
| |
| elif suffix in ['.txt', '.cfg', '.ini', '.conf']: |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| |
| elif file_path.name.lower() in ['readme', 'readme.md', 'license', 'license.txt', 'license.md']: |
| return self._chunk_readme_file(file_path, repo_metadata) |
| |
| |
| else: |
| return self._chunk_other_file(file_path, repo_metadata) |
| |
| def _chunk_python_file(self, file_path: Path, repo_metadata: Optional[Dict]) -> List[CodeChunk]: |
| """Use our hierarchical chunker for Python files""" |
| try: |
| if self.use_hierarchical: |
| chunks = self.hierarchical_chunker.chunk_file(file_path) |
| else: |
| |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| |
| if repo_metadata: |
| for chunk in chunks: |
| if "repo_info" not in chunk.metadata: |
| chunk.metadata["repo_info"] = {} |
| chunk.metadata["repo_info"].update(repo_metadata) |
| |
| return chunks |
| |
| except Exception as e: |
| print(f"[ERROR] Error chunking Python file {file_path}: {e}") |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| def _chunk_markdown_file_wrapper(self, file_path: Path, repo_metadata: Optional[Dict]) -> List[CodeChunk]: |
| """Chunk markdown files using our doc_chunker""" |
| try: |
| content = file_path.read_text(encoding='utf-8', errors='ignore') |
| |
| |
| doc_chunks = chunk_markdown_file( |
| content, |
| source_name=file_path.name, |
| source_url=f"file://{file_path}" |
| ) |
| |
| |
| code_chunks = [] |
| for doc_chunk in doc_chunks: |
| code_chunk = CodeChunk( |
| chunk_id=doc_chunk["chunk_id"], |
| file_path=str(file_path), |
| language=doc_chunk.get("language", "markdown"), |
| chunk_type="documentation", |
| code=doc_chunk["content"], |
| ast=ChunkAST( |
| symbol_type="documentation", |
| name=file_path.name, |
| parent=None, |
| docstring=None |
| ), |
| span=ChunkSpan( |
| start_line=doc_chunk.get("metadata", {}).get("line_start", 1), |
| end_line=doc_chunk.get("metadata", {}).get("line_end", 1) |
| ), |
| metadata={ |
| "doc_chunk_type": doc_chunk.get("chunk_type", "text"), |
| "repo_info": repo_metadata or {}, |
| **doc_chunk.get("metadata", {}) |
| }, |
| hierarchy=ChunkHierarchy( |
| is_primary=True, |
| is_extracted=False, |
| depth=0 |
| ) |
| ) |
| code_chunks.append(code_chunk) |
| |
| return code_chunks |
| |
| except Exception as e: |
| print(f"[ERROR] Error chunking markdown file {file_path}: {e}") |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| def _chunk_json_file(self, file_path: Path, repo_metadata: Optional[Dict]) -> List[CodeChunk]: |
| """Chunk JSON config files""" |
| try: |
| content = file_path.read_text(encoding='utf-8', errors='ignore') |
| data = json.loads(content) |
| |
| pretty_content = json.dumps(data, indent=2) |
| |
| |
| chunk = CodeChunk( |
| chunk_id=self._generate_stable_id(pretty_content, "config"), |
| file_path=str(file_path), |
| language="json", |
| chunk_type="configuration", |
| code=pretty_content, |
| ast=ChunkAST( |
| symbol_type="configuration", |
| name=file_path.name, |
| parent=None, |
| docstring=None |
| ), |
| span=ChunkSpan( |
| start_line=1, |
| end_line=len(pretty_content.split('\n')) |
| ), |
| metadata={ |
| "file_type": "json_config", |
| "config_keys": list(data.keys()) if isinstance(data, dict) else [], |
| "repo_info": repo_metadata or {} |
| }, |
| hierarchy=ChunkHierarchy( |
| is_primary=True, |
| is_extracted=False, |
| depth=0 |
| ) |
| ) |
| |
| return [chunk] |
| |
| except Exception as e: |
| print(f"[ERROR] Error chunking JSON file {file_path}: {e}") |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| def _chunk_config_file(self, file_path: Path, repo_metadata: Optional[Dict]) -> List[CodeChunk]: |
| """Chunk YAML/TOML config files""" |
| try: |
| content = file_path.read_text(encoding='utf-8', errors='ignore') |
| suffix = file_path.suffix.lower() |
| |
| language = "yaml" if suffix in ['.yaml', '.yml'] else "toml" |
| |
| |
| chunk = CodeChunk( |
| chunk_id=self._generate_stable_id(content, "config"), |
| file_path=str(file_path), |
| language=language, |
| chunk_type="configuration", |
| code=content, |
| ast=ChunkAST( |
| symbol_type="configuration", |
| name=file_path.name, |
| parent=None, |
| docstring=None |
| ), |
| span=ChunkSpan( |
| start_line=1, |
| end_line=len(content.split('\n')) |
| ), |
| metadata={ |
| "file_type": f"{language}_config", |
| "repo_info": repo_metadata or {} |
| }, |
| hierarchy=ChunkHierarchy( |
| is_primary=True, |
| is_extracted=False, |
| depth=0 |
| ) |
| ) |
| |
| return [chunk] |
| |
| except Exception as e: |
| print(f"[ERROR] Error chunking config file {file_path}: {e}") |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| def _chunk_special_file(self, file_path: Path, repo_metadata: Optional[Dict]) -> List[CodeChunk]: |
| """Chunk special files (requirements.txt, Dockerfile, etc.)""" |
| try: |
| content = file_path.read_text(encoding='utf-8', errors='ignore') |
| file_name = file_path.name.lower() |
| |
| if 'requirements' in file_name: |
| language = "requirements" |
| chunk_type = "configuration" |
| prefix = "config" |
| elif 'docker' in file_name: |
| language = "dockerfile" |
| chunk_type = "script" |
| prefix = "script" |
| else: |
| language = "text" |
| chunk_type = "text" |
| prefix = "text" |
| |
| |
| chunk = CodeChunk( |
| chunk_id=self._generate_stable_id(content, prefix), |
| file_path=str(file_path), |
| language=language, |
| chunk_type=chunk_type, |
| code=content, |
| ast=ChunkAST( |
| symbol_type=chunk_type, |
| name=file_path.name, |
| parent=None, |
| docstring=None |
| ), |
| span=ChunkSpan( |
| start_line=1, |
| end_line=len(content.split('\n')) |
| ), |
| metadata={ |
| "file_type": file_name, |
| "repo_info": repo_metadata or {}, |
| "dependencies": self._extract_dependencies(content) if "requirements" in file_name else [] |
| }, |
| hierarchy=ChunkHierarchy( |
| is_primary=True, |
| is_extracted=False, |
| depth=0 |
| ) |
| ) |
| |
| return [chunk] |
| |
| except Exception as e: |
| print(f"[ERROR] Error chunking special file {file_path}: {e}") |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| def _chunk_text_file(self, file_path: Path, repo_metadata: Optional[Dict]) -> List[CodeChunk]: |
| """Chunk plain text files""" |
| try: |
| content = file_path.read_text(encoding='utf-8', errors='ignore') |
| |
| |
| if len(content.split('\n')) <= 200: |
| chunks = [self._create_text_chunk(content, file_path, repo_metadata)] |
| else: |
| |
| chunks = [] |
| lines = content.split('\n') |
| chunk_size = 100 |
| |
| for i in range(0, len(lines), chunk_size): |
| chunk_lines = lines[i:i + chunk_size] |
| chunk_content = '\n'.join(chunk_lines) |
| |
| chunk = self._create_text_chunk( |
| chunk_content, |
| file_path, |
| repo_metadata, |
| chunk_index=i // chunk_size |
| ) |
| chunks.append(chunk) |
| |
| return chunks |
| |
| except Exception as e: |
| print(f"[ERROR] Error reading text file {file_path}: {e}") |
| return [] |
| |
| def _chunk_readme_file(self, file_path: Path, repo_metadata: Optional[Dict]) -> List[CodeChunk]: |
| """Special handling for README/LICENSE files""" |
| try: |
| content = file_path.read_text(encoding='utf-8', errors='ignore') |
| file_name_lower = file_path.name.lower() |
| |
| |
| if 'readme' in file_name_lower: |
| prefix = "doc" |
| elif 'license' in file_name_lower: |
| prefix = "license" |
| else: |
| prefix = "doc" |
| |
| |
| chunk = CodeChunk( |
| chunk_id=self._generate_stable_id(content, prefix), |
| file_path=str(file_path), |
| language="markdown" if file_path.suffix in ['.md', '.mdx'] else "text", |
| chunk_type="documentation", |
| code=content, |
| ast=ChunkAST( |
| symbol_type="documentation", |
| name=file_path.name, |
| parent=None, |
| docstring=None |
| ), |
| span=ChunkSpan( |
| start_line=1, |
| end_line=len(content.split('\n')) |
| ), |
| metadata={ |
| "file_type": "readme_license", |
| "is_readme": "readme" in file_name_lower, |
| "is_license": "license" in file_name_lower, |
| "repo_info": repo_metadata or {} |
| }, |
| hierarchy=ChunkHierarchy( |
| is_primary=True, |
| is_extracted=False, |
| depth=0 |
| ) |
| ) |
| |
| return [chunk] |
| |
| except Exception as e: |
| print(f"[ERROR] Error chunking README file {file_path}: {e}") |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| def _chunk_other_file(self, file_path: Path, repo_metadata: Optional[Dict]) -> List[CodeChunk]: |
| """Fallback for unknown file types (binary or unsupported)""" |
| try: |
| |
| content = file_path.read_text(encoding='utf-8', errors='ignore') |
| |
| |
| if self._looks_like_binary(content): |
| print(f"[SKIPPED] Skipping binary file: {file_path}") |
| return [] |
| |
| |
| return self._chunk_text_file(file_path, repo_metadata) |
| |
| except UnicodeDecodeError: |
| print(f"[SKIPPED] Skipping binary file: {file_path}") |
| return [] |
| except Exception as e: |
| print(f"[ERROR] Error with file {file_path}: {e}") |
| return [] |
| |
| def _create_text_chunk(self, content: str, file_path: Path, |
| repo_metadata: Optional[Dict], chunk_index: int = 0) -> CodeChunk: |
| """Helper to create a text chunk""" |
| lines = content.split('\n') |
| |
| |
| id_payload = f"{content}_{chunk_index}" |
| |
| return CodeChunk( |
| chunk_id=self._generate_stable_id(id_payload, "text"), |
| file_path=str(file_path), |
| language="text", |
| chunk_type="text", |
| code=content, |
| ast=ChunkAST( |
| symbol_type="text", |
| name=file_path.name, |
| parent=None, |
| docstring=None |
| ), |
| span=ChunkSpan( |
| start_line=1, |
| end_line=len(lines) |
| ), |
| metadata={ |
| "file_type": "text", |
| "chunk_index": chunk_index, |
| "total_lines": len(lines), |
| "repo_info": repo_metadata or {} |
| }, |
| hierarchy=ChunkHierarchy( |
| is_primary=True, |
| is_extracted=False, |
| depth=0 |
| ) |
| ) |
| |
| def _extract_dependencies(self, requirements_content: str) -> List[str]: |
| """Extract package names from requirements.txt""" |
| dependencies = [] |
| for line in requirements_content.split('\n'): |
| line = line.strip() |
| if line and not line.startswith('#'): |
| |
| package = line.split('==')[0].split('>=')[0].split('<=')[0].strip() |
| if package: |
| dependencies.append(package) |
| return dependencies |
| |
| def _looks_like_binary(self, content: str, threshold: float = 0.3) -> bool: |
| """Check if content looks like binary data""" |
| if not content: |
| return False |
| |
| |
| printable = sum(1 for c in content if 32 <= ord(c) <= 126 or c in '\n\r\t') |
| total = len(content) |
| |
| if total == 0: |
| return False |
| |
| ratio = printable / total |
| return ratio < threshold |
|
|