| """Semantic chunker for processing markdown documents with hierarchical structure.""" |
|
|
| import hashlib |
| import json |
| import re |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
|
|
| from llama_index.core.node_parser import SentenceSplitter |
| from pydantic import BaseModel, Field |
| from rich.console import Console |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn |
|
|
| from config.settings import settings |
|
|
|
|
| class ChunkNode(BaseModel): |
| """ |
| Pydantic model representing a semantic chunk of text. |
| |
| Attributes: |
| chunk_id: Unique identifier for the chunk |
| content: The actual text content |
| parent_section: The section header this chunk belongs to |
| document_title: Original article title |
| source_url: EyeWiki URL of the source document |
| chunk_index: Position of chunk in the document (0-indexed) |
| token_count: Approximate number of tokens in the chunk |
| metadata: Additional metadata from the source document |
| """ |
|
|
| chunk_id: str = Field(..., description="Unique identifier (hash-based)") |
| content: str = Field(..., description="Text content of the chunk") |
| parent_section: str = Field(default="", description="Parent section header") |
| document_title: str = Field(default="", description="Original document title") |
| source_url: str = Field(default="", description="Source URL") |
| chunk_index: int = Field(..., ge=0, description="Position in document") |
| token_count: int = Field(..., ge=0, description="Approximate token count") |
| metadata: Dict = Field(default_factory=dict, description="Additional metadata") |
|
|
| def to_dict(self) -> Dict: |
| """Convert to dictionary representation.""" |
| return self.model_dump() |
|
|
| @classmethod |
| def from_dict(cls, data: Dict) -> "ChunkNode": |
| """Create ChunkNode from dictionary.""" |
| return cls(**data) |
|
|
|
|
| class SemanticChunker: |
| """ |
| Hierarchical semantic chunker that respects markdown structure. |
| |
| Features: |
| - Splits on ## headers first (sections) |
| - Then splits large sections into semantic chunks |
| - Preserves parent section context |
| - Uses LlamaIndex SentenceSplitter for semantic splitting |
| - Configurable chunk sizes and overlap |
| """ |
|
|
| def __init__( |
| self, |
| chunk_size: Optional[int] = None, |
| chunk_overlap: Optional[int] = None, |
| min_chunk_size: int = 100, |
| ): |
| """ |
| Initialize the SemanticChunker. |
| |
| Args: |
| chunk_size: Target chunk size in tokens (default: from settings) |
| chunk_overlap: Overlap between chunks in tokens (default: from settings) |
| min_chunk_size: Minimum chunk size to keep (default: 100 tokens) |
| """ |
| self.chunk_size = chunk_size or settings.chunk_size |
| self.chunk_overlap = chunk_overlap or settings.chunk_overlap |
| self.min_chunk_size = min_chunk_size |
|
|
| |
| self.sentence_splitter = SentenceSplitter( |
| chunk_size=self.chunk_size, |
| chunk_overlap=self.chunk_overlap, |
| ) |
|
|
| self.console = Console() |
|
|
| def _estimate_tokens(self, text: str) -> int: |
| """ |
| Estimate token count for text. |
| |
| Uses a simple heuristic: ~4 characters per token. |
| More accurate than word count for medical/technical text. |
| |
| Args: |
| text: Input text |
| |
| Returns: |
| Estimated token count |
| """ |
| return len(text) // 4 |
|
|
| def _generate_chunk_id(self, content: str, chunk_index: int, source_url: str) -> str: |
| """ |
| Generate unique chunk ID using hash. |
| |
| Args: |
| content: Chunk content |
| chunk_index: Index of chunk |
| source_url: Source URL |
| |
| Returns: |
| Unique chunk identifier |
| """ |
| |
| unique_string = f"{source_url}:{chunk_index}:{content[:100]}" |
| return hashlib.sha256(unique_string.encode()).hexdigest()[:16] |
|
|
| def _parse_markdown_sections(self, markdown: str) -> List[Tuple[str, str]]: |
| """ |
| Parse markdown into sections based on ## headers. |
| |
| Args: |
| markdown: Markdown content |
| |
| Returns: |
| List of (header, content) tuples |
| """ |
| sections = [] |
|
|
| |
| |
| pattern = r"^##\s+(.+?)$" |
| lines = markdown.split("\n") |
|
|
| current_header = "" |
| current_content = [] |
|
|
| for line in lines: |
| match = re.match(pattern, line) |
| if match: |
| |
| if current_content: |
| sections.append((current_header, "\n".join(current_content))) |
|
|
| |
| current_header = match.group(1).strip() |
| current_content = [line] |
| else: |
| current_content.append(line) |
|
|
| |
| if current_content: |
| sections.append((current_header, "\n".join(current_content))) |
|
|
| return sections |
|
|
| def _split_large_section(self, text: str) -> List[str]: |
| """ |
| Split large section into semantic chunks using LlamaIndex. |
| |
| Args: |
| text: Section text to split |
| |
| Returns: |
| List of text chunks |
| """ |
| |
| chunks = self.sentence_splitter.split_text(text) |
| return chunks |
|
|
| def _clean_content(self, content: str) -> str: |
| """ |
| Clean chunk content by removing excessive whitespace. |
| |
| Args: |
| content: Raw content |
| |
| Returns: |
| Cleaned content |
| """ |
| |
| content = re.sub(r"\n{3,}", "\n\n", content) |
|
|
| |
| content = content.strip() |
|
|
| return content |
|
|
| def chunk_document( |
| self, |
| markdown_content: str, |
| metadata: Dict, |
| ) -> List[ChunkNode]: |
| """ |
| Chunk a markdown document with hierarchical structure. |
| |
| Process: |
| 1. Parse document into sections by ## headers |
| 2. For each section, check if it needs splitting |
| 3. If section is small enough, keep as single chunk |
| 4. If section is large, split into semantic chunks |
| 5. Preserve parent section context in each chunk |
| |
| Args: |
| markdown_content: Markdown text content |
| metadata: Document metadata (must include 'url' and 'title') |
| |
| Returns: |
| List of ChunkNode objects |
| """ |
| chunks = [] |
| chunk_index = 0 |
|
|
| |
| source_url = metadata.get("url", "") |
| document_title = metadata.get("title", "Untitled") |
|
|
| |
| sections = self._parse_markdown_sections(markdown_content) |
|
|
| |
| if not sections or (len(sections) == 1 and not sections[0][0]): |
| sections = [("", markdown_content)] |
|
|
| for section_header, section_content in sections: |
| |
| section_content = self._clean_content(section_content) |
|
|
| |
| if not section_content: |
| continue |
|
|
| |
| section_tokens = self._estimate_tokens(section_content) |
|
|
| |
| if section_tokens <= self.chunk_size: |
| |
| if section_tokens >= self.min_chunk_size: |
| chunk_id = self._generate_chunk_id( |
| section_content, chunk_index, source_url |
| ) |
|
|
| chunk = ChunkNode( |
| chunk_id=chunk_id, |
| content=section_content, |
| parent_section=section_header, |
| document_title=document_title, |
| source_url=source_url, |
| chunk_index=chunk_index, |
| token_count=section_tokens, |
| metadata=metadata, |
| ) |
| chunks.append(chunk) |
| chunk_index += 1 |
| else: |
| |
| sub_chunks = self._split_large_section(section_content) |
|
|
| for sub_chunk_content in sub_chunks: |
| sub_chunk_content = self._clean_content(sub_chunk_content) |
|
|
| |
| sub_chunk_tokens = self._estimate_tokens(sub_chunk_content) |
| if sub_chunk_tokens < self.min_chunk_size: |
| continue |
|
|
| chunk_id = self._generate_chunk_id( |
| sub_chunk_content, chunk_index, source_url |
| ) |
|
|
| chunk = ChunkNode( |
| chunk_id=chunk_id, |
| content=sub_chunk_content, |
| parent_section=section_header, |
| document_title=document_title, |
| source_url=source_url, |
| chunk_index=chunk_index, |
| token_count=sub_chunk_tokens, |
| metadata=metadata, |
| ) |
| chunks.append(chunk) |
| chunk_index += 1 |
|
|
| return chunks |
|
|
| def chunk_directory( |
| self, |
| input_dir: Path, |
| output_dir: Path, |
| pattern: str = "*.md", |
| ) -> Dict[str, int]: |
| """ |
| Process all markdown files in a directory. |
| |
| For each .md file, looks for corresponding .json metadata file, |
| chunks the document, and saves chunks to output directory. |
| |
| Args: |
| input_dir: Directory containing markdown files |
| output_dir: Directory to save chunked outputs |
| pattern: Glob pattern for files to process (default: "*.md") |
| |
| Returns: |
| Dictionary with processing statistics |
| """ |
| input_dir = Path(input_dir) |
| output_dir = Path(output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| md_files = list(input_dir.glob(pattern)) |
|
|
| if not md_files: |
| self.console.print(f"[yellow]No files matching '{pattern}' found in {input_dir}[/yellow]") |
| return {"processed": 0, "failed": 0, "total_chunks": 0} |
|
|
| stats = { |
| "processed": 0, |
| "failed": 0, |
| "skipped": 0, |
| "total_chunks": 0, |
| "total_tokens": 0, |
| } |
|
|
| self.console.print(f"\n[bold cyan]Chunking Documents[/bold cyan]") |
| self.console.print(f"Input: {input_dir}") |
| self.console.print(f"Output: {output_dir}") |
| self.console.print(f"Files found: {len(md_files)}\n") |
|
|
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| BarColumn(), |
| TaskProgressColumn(), |
| console=self.console, |
| ) as progress: |
|
|
| task = progress.add_task( |
| "[cyan]Processing...", |
| total=len(md_files), |
| ) |
|
|
| for md_file in md_files: |
| try: |
| |
| json_file = md_file.with_suffix(".json") |
|
|
| if not json_file.exists(): |
| self.console.print( |
| f"[yellow]Skipping {md_file.name}: No metadata file found[/yellow]" |
| ) |
| stats["skipped"] += 1 |
| progress.advance(task) |
| continue |
|
|
| |
| with open(md_file, "r", encoding="utf-8") as f: |
| markdown_content = f.read() |
|
|
| |
| with open(json_file, "r", encoding="utf-8") as f: |
| metadata = json.load(f) |
|
|
| |
| if self._estimate_tokens(markdown_content) < self.min_chunk_size: |
| self.console.print( |
| f"[yellow]Skipping {md_file.name}: Content too small[/yellow]" |
| ) |
| stats["skipped"] += 1 |
| progress.advance(task) |
| continue |
|
|
| |
| chunks = self.chunk_document(markdown_content, metadata) |
|
|
| if not chunks: |
| self.console.print( |
| f"[yellow]Skipping {md_file.name}: No chunks created[/yellow]" |
| ) |
| stats["skipped"] += 1 |
| progress.advance(task) |
| continue |
|
|
| |
| output_file = output_dir / f"{md_file.stem}_chunks.json" |
| with open(output_file, "w", encoding="utf-8") as f: |
| chunk_dicts = [chunk.to_dict() for chunk in chunks] |
| json.dump(chunk_dicts, f, indent=2, ensure_ascii=False) |
|
|
| |
| stats["processed"] += 1 |
| stats["total_chunks"] += len(chunks) |
| stats["total_tokens"] += sum(chunk.token_count for chunk in chunks) |
|
|
| progress.update( |
| task, |
| description=f"[cyan]Processing ({stats['processed']} done, {stats['total_chunks']} chunks): {md_file.name[:40]}...", |
| ) |
| progress.advance(task) |
|
|
| except Exception as e: |
| self.console.print(f"[red]Error processing {md_file.name}: {e}[/red]") |
| stats["failed"] += 1 |
| progress.advance(task) |
|
|
| |
| self.console.print("\n[bold cyan]Chunking Summary[/bold cyan]") |
| self.console.print(f"Files processed: {stats['processed']}") |
| self.console.print(f"Files skipped: {stats['skipped']}") |
| self.console.print(f"Files failed: {stats['failed']}") |
| self.console.print(f"Total chunks created: {stats['total_chunks']}") |
| self.console.print(f"Total tokens: {stats['total_tokens']:,}") |
|
|
| if stats["processed"] > 0: |
| avg_chunks = stats["total_chunks"] / stats["processed"] |
| avg_tokens = stats["total_tokens"] / stats["total_chunks"] if stats["total_chunks"] > 0 else 0 |
| self.console.print(f"Average chunks per document: {avg_chunks:.1f}") |
| self.console.print(f"Average tokens per chunk: {avg_tokens:.1f}") |
|
|
| return stats |
|
|