File size: 5,758 Bytes
5ff6b14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""

Text Chunker Module



Handles chunking text into smaller pieces with overlap for better context preservation.

"""

import re
from typing import List
from config.config import CHUNK_SIZE, CHUNK_OVERLAP


class TextChunker:
    """Handles text chunking with overlap and smart boundary detection."""
    
    def __init__(self):
        """Initialize the text chunker."""
        self.chunk_size = CHUNK_SIZE
        self.chunk_overlap = CHUNK_OVERLAP
    
    def chunk_text(self, text: str) -> List[str]:
        """

        Chunk text into smaller pieces with overlap.

        

        Args:

            text: The input text to chunk

            

        Returns:

            List[str]: List of text chunks

        """
        print(f"✂️ Chunking text into {self.chunk_size} character chunks with {self.chunk_overlap} overlap")
        
        # Clean the text
        cleaned_text = self._clean_text(text)
        
        chunks = []
        start = 0
        
        while start < len(cleaned_text):
            end = start + self.chunk_size
            
            # Try to end at sentence boundary
            if end < len(cleaned_text):
                end = self._find_sentence_boundary(cleaned_text, start, end)
            
            chunk = cleaned_text[start:end].strip()
            
            # Only add chunk if it's meaningful
            if chunk and len(chunk) > 50:
                chunks.append(chunk)
            
            # Move start position with overlap
            start = end - self.chunk_overlap
            if start >= len(cleaned_text):
                break
        
        print(f"✅ Created {len(chunks)} chunks (size={self.chunk_size}, overlap={self.chunk_overlap})")
        return chunks
    
    def _clean_text(self, text: str) -> str:
        """

        Clean text by normalizing whitespace and removing excessive line breaks.

        

        Args:

            text: Raw text to clean

            

        Returns:

            str: Cleaned text

        """
        # Replace multiple whitespace with single space
        text = re.sub(r'\s+', ' ', text)
        return text.strip()
    
    def _find_sentence_boundary(self, text: str, start: int, preferred_end: int) -> int:
        """

        Find the best sentence boundary near the preferred end position.

        

        Args:

            text: The full text

            start: Start position of the chunk

            preferred_end: Preferred end position

            

        Returns:

            int: Adjusted end position at sentence boundary

        """
        # Look for sentence endings within a reasonable range
        search_start = max(start, preferred_end - 100)
        search_end = min(len(text), preferred_end + 50)
        
        sentence_endings = ['.', '!', '?']
        best_end = preferred_end
        
        # Search backwards from preferred end for sentence boundary
        for i in range(preferred_end - 1, search_start - 1, -1):
            if text[i] in sentence_endings:
                # Check if this looks like a real sentence ending
                if self._is_valid_sentence_ending(text, i):
                    best_end = i + 1
                    break
        
        return best_end
    
    def _is_valid_sentence_ending(self, text: str, pos: int) -> bool:
        """

        Check if a punctuation mark represents a valid sentence ending.

        

        Args:

            text: The full text

            pos: Position of the punctuation mark

            

        Returns:

            bool: True if it's a valid sentence ending

        """
        # Avoid breaking on abbreviations like "Dr.", "Mr.", etc.
        if pos > 0 and text[pos] == '.':
            # Look at the character before the period
            char_before = text[pos - 1]
            if char_before.isupper():
                # Might be an abbreviation
                word_start = pos - 1
                while word_start > 0 and text[word_start - 1].isalpha():
                    word_start -= 1
                
                word = text[word_start:pos]
                # Common abbreviations to avoid breaking on
                abbreviations = {'Dr', 'Mr', 'Mrs', 'Ms', 'Prof', 'Inc', 'Ltd', 'Corp', 'Co'}
                if word in abbreviations:
                    return False
        
        # Check if there's a space or newline after the punctuation
        if pos + 1 < len(text):
            next_char = text[pos + 1]
            return next_char.isspace() or next_char.isupper()
        
        return True
    
    def get_chunk_stats(self, chunks: List[str]) -> dict:
        """

        Get statistics about the created chunks.

        

        Args:

            chunks: List of text chunks

            

        Returns:

            dict: Statistics about the chunks

        """
        if not chunks:
            return {
                "total_chunks": 0,
                "total_characters": 0,
                "total_words": 0,
                "avg_chunk_size": 0,
                "min_chunk_size": 0,
                "max_chunk_size": 0
            }
        
        chunk_sizes = [len(chunk) for chunk in chunks]
        total_chars = sum(chunk_sizes)
        total_words = sum(len(chunk.split()) for chunk in chunks)
        
        return {
            "total_chunks": len(chunks),
            "total_characters": total_chars,
            "total_words": total_words,
            "avg_chunk_size": total_chars / len(chunks),
            "min_chunk_size": min(chunk_sizes),
            "max_chunk_size": max(chunk_sizes)
        }