quantumbit's picture
Upload 41 files
5ff6b14 verified
raw
history blame
5.76 kB
"""
Text Chunker Module
Handles chunking text into smaller pieces with overlap for better context preservation.
"""
import re
from typing import List
from config.config import CHUNK_SIZE, CHUNK_OVERLAP
class TextChunker:
"""Handles text chunking with overlap and smart boundary detection."""
def __init__(self):
"""Initialize the text chunker."""
self.chunk_size = CHUNK_SIZE
self.chunk_overlap = CHUNK_OVERLAP
def chunk_text(self, text: str) -> List[str]:
"""
Chunk text into smaller pieces with overlap.
Args:
text: The input text to chunk
Returns:
List[str]: List of text chunks
"""
print(f"✂️ Chunking text into {self.chunk_size} character chunks with {self.chunk_overlap} overlap")
# Clean the text
cleaned_text = self._clean_text(text)
chunks = []
start = 0
while start < len(cleaned_text):
end = start + self.chunk_size
# Try to end at sentence boundary
if end < len(cleaned_text):
end = self._find_sentence_boundary(cleaned_text, start, end)
chunk = cleaned_text[start:end].strip()
# Only add chunk if it's meaningful
if chunk and len(chunk) > 50:
chunks.append(chunk)
# Move start position with overlap
start = end - self.chunk_overlap
if start >= len(cleaned_text):
break
print(f"✅ Created {len(chunks)} chunks (size={self.chunk_size}, overlap={self.chunk_overlap})")
return chunks
def _clean_text(self, text: str) -> str:
"""
Clean text by normalizing whitespace and removing excessive line breaks.
Args:
text: Raw text to clean
Returns:
str: Cleaned text
"""
# Replace multiple whitespace with single space
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _find_sentence_boundary(self, text: str, start: int, preferred_end: int) -> int:
"""
Find the best sentence boundary near the preferred end position.
Args:
text: The full text
start: Start position of the chunk
preferred_end: Preferred end position
Returns:
int: Adjusted end position at sentence boundary
"""
# Look for sentence endings within a reasonable range
search_start = max(start, preferred_end - 100)
search_end = min(len(text), preferred_end + 50)
sentence_endings = ['.', '!', '?']
best_end = preferred_end
# Search backwards from preferred end for sentence boundary
for i in range(preferred_end - 1, search_start - 1, -1):
if text[i] in sentence_endings:
# Check if this looks like a real sentence ending
if self._is_valid_sentence_ending(text, i):
best_end = i + 1
break
return best_end
def _is_valid_sentence_ending(self, text: str, pos: int) -> bool:
"""
Check if a punctuation mark represents a valid sentence ending.
Args:
text: The full text
pos: Position of the punctuation mark
Returns:
bool: True if it's a valid sentence ending
"""
# Avoid breaking on abbreviations like "Dr.", "Mr.", etc.
if pos > 0 and text[pos] == '.':
# Look at the character before the period
char_before = text[pos - 1]
if char_before.isupper():
# Might be an abbreviation
word_start = pos - 1
while word_start > 0 and text[word_start - 1].isalpha():
word_start -= 1
word = text[word_start:pos]
# Common abbreviations to avoid breaking on
abbreviations = {'Dr', 'Mr', 'Mrs', 'Ms', 'Prof', 'Inc', 'Ltd', 'Corp', 'Co'}
if word in abbreviations:
return False
# Check if there's a space or newline after the punctuation
if pos + 1 < len(text):
next_char = text[pos + 1]
return next_char.isspace() or next_char.isupper()
return True
def get_chunk_stats(self, chunks: List[str]) -> dict:
"""
Get statistics about the created chunks.
Args:
chunks: List of text chunks
Returns:
dict: Statistics about the chunks
"""
if not chunks:
return {
"total_chunks": 0,
"total_characters": 0,
"total_words": 0,
"avg_chunk_size": 0,
"min_chunk_size": 0,
"max_chunk_size": 0
}
chunk_sizes = [len(chunk) for chunk in chunks]
total_chars = sum(chunk_sizes)
total_words = sum(len(chunk.split()) for chunk in chunks)
return {
"total_chunks": len(chunks),
"total_characters": total_chars,
"total_words": total_words,
"avg_chunk_size": total_chars / len(chunks),
"min_chunk_size": min(chunk_sizes),
"max_chunk_size": max(chunk_sizes)
}