Spaces:
Sleeping
Sleeping
""" | |
Text Chunker Module | |
Handles chunking text into smaller pieces with overlap for better context preservation. | |
""" | |
import re | |
from typing import List | |
from config.config import CHUNK_SIZE, CHUNK_OVERLAP | |
class TextChunker: | |
"""Handles text chunking with overlap and smart boundary detection.""" | |
def __init__(self): | |
"""Initialize the text chunker.""" | |
self.chunk_size = CHUNK_SIZE | |
self.chunk_overlap = CHUNK_OVERLAP | |
def chunk_text(self, text: str) -> List[str]: | |
""" | |
Chunk text into smaller pieces with overlap. | |
Args: | |
text: The input text to chunk | |
Returns: | |
List[str]: List of text chunks | |
""" | |
print(f"✂️ Chunking text into {self.chunk_size} character chunks with {self.chunk_overlap} overlap") | |
# Clean the text | |
cleaned_text = self._clean_text(text) | |
chunks = [] | |
start = 0 | |
while start < len(cleaned_text): | |
end = start + self.chunk_size | |
# Try to end at sentence boundary | |
if end < len(cleaned_text): | |
end = self._find_sentence_boundary(cleaned_text, start, end) | |
chunk = cleaned_text[start:end].strip() | |
# Only add chunk if it's meaningful | |
if chunk and len(chunk) > 50: | |
chunks.append(chunk) | |
# Move start position with overlap | |
start = end - self.chunk_overlap | |
if start >= len(cleaned_text): | |
break | |
print(f"✅ Created {len(chunks)} chunks (size={self.chunk_size}, overlap={self.chunk_overlap})") | |
return chunks | |
def _clean_text(self, text: str) -> str: | |
""" | |
Clean text by normalizing whitespace and removing excessive line breaks. | |
Args: | |
text: Raw text to clean | |
Returns: | |
str: Cleaned text | |
""" | |
# Replace multiple whitespace with single space | |
text = re.sub(r'\s+', ' ', text) | |
return text.strip() | |
def _find_sentence_boundary(self, text: str, start: int, preferred_end: int) -> int: | |
""" | |
Find the best sentence boundary near the preferred end position. | |
Args: | |
text: The full text | |
start: Start position of the chunk | |
preferred_end: Preferred end position | |
Returns: | |
int: Adjusted end position at sentence boundary | |
""" | |
# Look for sentence endings within a reasonable range | |
search_start = max(start, preferred_end - 100) | |
search_end = min(len(text), preferred_end + 50) | |
sentence_endings = ['.', '!', '?'] | |
best_end = preferred_end | |
# Search backwards from preferred end for sentence boundary | |
for i in range(preferred_end - 1, search_start - 1, -1): | |
if text[i] in sentence_endings: | |
# Check if this looks like a real sentence ending | |
if self._is_valid_sentence_ending(text, i): | |
best_end = i + 1 | |
break | |
return best_end | |
def _is_valid_sentence_ending(self, text: str, pos: int) -> bool: | |
""" | |
Check if a punctuation mark represents a valid sentence ending. | |
Args: | |
text: The full text | |
pos: Position of the punctuation mark | |
Returns: | |
bool: True if it's a valid sentence ending | |
""" | |
# Avoid breaking on abbreviations like "Dr.", "Mr.", etc. | |
if pos > 0 and text[pos] == '.': | |
# Look at the character before the period | |
char_before = text[pos - 1] | |
if char_before.isupper(): | |
# Might be an abbreviation | |
word_start = pos - 1 | |
while word_start > 0 and text[word_start - 1].isalpha(): | |
word_start -= 1 | |
word = text[word_start:pos] | |
# Common abbreviations to avoid breaking on | |
abbreviations = {'Dr', 'Mr', 'Mrs', 'Ms', 'Prof', 'Inc', 'Ltd', 'Corp', 'Co'} | |
if word in abbreviations: | |
return False | |
# Check if there's a space or newline after the punctuation | |
if pos + 1 < len(text): | |
next_char = text[pos + 1] | |
return next_char.isspace() or next_char.isupper() | |
return True | |
def get_chunk_stats(self, chunks: List[str]) -> dict: | |
""" | |
Get statistics about the created chunks. | |
Args: | |
chunks: List of text chunks | |
Returns: | |
dict: Statistics about the chunks | |
""" | |
if not chunks: | |
return { | |
"total_chunks": 0, | |
"total_characters": 0, | |
"total_words": 0, | |
"avg_chunk_size": 0, | |
"min_chunk_size": 0, | |
"max_chunk_size": 0 | |
} | |
chunk_sizes = [len(chunk) for chunk in chunks] | |
total_chars = sum(chunk_sizes) | |
total_words = sum(len(chunk.split()) for chunk in chunks) | |
return { | |
"total_chunks": len(chunks), | |
"total_characters": total_chars, | |
"total_words": total_words, | |
"avg_chunk_size": total_chars / len(chunks), | |
"min_chunk_size": min(chunk_sizes), | |
"max_chunk_size": max(chunk_sizes) | |
} | |