Spaces:
Sleeping
Sleeping
File size: 5,758 Bytes
5ff6b14 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
"""
Text Chunker Module
Handles chunking text into smaller pieces with overlap for better context preservation.
"""
import re
from typing import List
from config.config import CHUNK_SIZE, CHUNK_OVERLAP
class TextChunker:
"""Handles text chunking with overlap and smart boundary detection."""
def __init__(self):
"""Initialize the text chunker."""
self.chunk_size = CHUNK_SIZE
self.chunk_overlap = CHUNK_OVERLAP
def chunk_text(self, text: str) -> List[str]:
"""
Chunk text into smaller pieces with overlap.
Args:
text: The input text to chunk
Returns:
List[str]: List of text chunks
"""
print(f"✂️ Chunking text into {self.chunk_size} character chunks with {self.chunk_overlap} overlap")
# Clean the text
cleaned_text = self._clean_text(text)
chunks = []
start = 0
while start < len(cleaned_text):
end = start + self.chunk_size
# Try to end at sentence boundary
if end < len(cleaned_text):
end = self._find_sentence_boundary(cleaned_text, start, end)
chunk = cleaned_text[start:end].strip()
# Only add chunk if it's meaningful
if chunk and len(chunk) > 50:
chunks.append(chunk)
# Move start position with overlap
start = end - self.chunk_overlap
if start >= len(cleaned_text):
break
print(f"✅ Created {len(chunks)} chunks (size={self.chunk_size}, overlap={self.chunk_overlap})")
return chunks
def _clean_text(self, text: str) -> str:
"""
Clean text by normalizing whitespace and removing excessive line breaks.
Args:
text: Raw text to clean
Returns:
str: Cleaned text
"""
# Replace multiple whitespace with single space
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _find_sentence_boundary(self, text: str, start: int, preferred_end: int) -> int:
"""
Find the best sentence boundary near the preferred end position.
Args:
text: The full text
start: Start position of the chunk
preferred_end: Preferred end position
Returns:
int: Adjusted end position at sentence boundary
"""
# Look for sentence endings within a reasonable range
search_start = max(start, preferred_end - 100)
search_end = min(len(text), preferred_end + 50)
sentence_endings = ['.', '!', '?']
best_end = preferred_end
# Search backwards from preferred end for sentence boundary
for i in range(preferred_end - 1, search_start - 1, -1):
if text[i] in sentence_endings:
# Check if this looks like a real sentence ending
if self._is_valid_sentence_ending(text, i):
best_end = i + 1
break
return best_end
def _is_valid_sentence_ending(self, text: str, pos: int) -> bool:
"""
Check if a punctuation mark represents a valid sentence ending.
Args:
text: The full text
pos: Position of the punctuation mark
Returns:
bool: True if it's a valid sentence ending
"""
# Avoid breaking on abbreviations like "Dr.", "Mr.", etc.
if pos > 0 and text[pos] == '.':
# Look at the character before the period
char_before = text[pos - 1]
if char_before.isupper():
# Might be an abbreviation
word_start = pos - 1
while word_start > 0 and text[word_start - 1].isalpha():
word_start -= 1
word = text[word_start:pos]
# Common abbreviations to avoid breaking on
abbreviations = {'Dr', 'Mr', 'Mrs', 'Ms', 'Prof', 'Inc', 'Ltd', 'Corp', 'Co'}
if word in abbreviations:
return False
# Check if there's a space or newline after the punctuation
if pos + 1 < len(text):
next_char = text[pos + 1]
return next_char.isspace() or next_char.isupper()
return True
def get_chunk_stats(self, chunks: List[str]) -> dict:
"""
Get statistics about the created chunks.
Args:
chunks: List of text chunks
Returns:
dict: Statistics about the chunks
"""
if not chunks:
return {
"total_chunks": 0,
"total_characters": 0,
"total_words": 0,
"avg_chunk_size": 0,
"min_chunk_size": 0,
"max_chunk_size": 0
}
chunk_sizes = [len(chunk) for chunk in chunks]
total_chars = sum(chunk_sizes)
total_words = sum(len(chunk.split()) for chunk in chunks)
return {
"total_chunks": len(chunks),
"total_characters": total_chars,
"total_words": total_words,
"avg_chunk_size": total_chars / len(chunks),
"min_chunk_size": min(chunk_sizes),
"max_chunk_size": max(chunk_sizes)
}
|