""" This module is responsible for processing the corpus and feeding it into chromaDB. It will receive a corpus of text. It will then split it into chunks of specified length. For each of those chunks, it will append surrounding context. It will only include full words. """ import re import bisect import extensions.superboogav2.parameters as parameters from .data_preprocessor import TextPreprocessorBuilder, TextSummarizer from .chromadb import ChromaCollector def preprocess_text_no_summary(text) -> str: builder = TextPreprocessorBuilder(text) if parameters.should_to_lower(): builder.to_lower() if parameters.should_remove_punctuation(): builder.remove_punctuation() if parameters.should_remove_specific_pos(): builder.remove_specific_pos() if parameters.should_remove_stopwords(): builder.remove_stopwords if parameters.should_lemmatize(): builder.lemmatize() if parameters.should_merge_spaces(): builder.merge_spaces if parameters.should_strip(): builder.strip() if parameters.get_num_conversion_strategy(): if parameters.get_num_conversion_strategy() == parameters.NUM_TO_WORD_METHOD: builder.num_to_word(parameters.get_min_num_length()) elif parameters.get_num_conversion_strategy() == parameters.NUM_TO_CHAR_METHOD: builder.num_to_char(parameters.get_min_num_length()) elif parameters.get_num_conversion_strategy() == parameters.NUM_TO_CHAR_LONG_METHOD: builder.num_to_char_long(parameters.get_min_num_length()) return builder.build() def preprocess_text(text) -> list[str]: important_sentences = TextSummarizer.process_long_text(text, parameters.get_min_num_sentences()) return [preprocess_text_no_summary(sent) for sent in important_sentences] def _create_chunks_with_context(corpus, chunk_len, context_left, context_right): """ This function takes a corpus of text and splits it into chunks of a specified length, then adds a specified amount of context to each chunk. The context is added by first going backwards from the start of the chunk and then going forwards from the end of the chunk, ensuring that the context includes only whole words and that the total context length does not exceed the specified limit. This function uses binary search for efficiency. Returns: chunks (list of str): The chunks of text. chunks_with_context (list of str): The chunks of text with added context. chunk_with_context_start_indices (list of int): The starting indices of each chunk with context in the corpus. """ words = re.split('(\\s+)', corpus) word_start_indices = [0] current_index = 0 for word in words: current_index += len(word) word_start_indices.append(current_index) chunks, chunk_lengths, chunk_start_indices, chunk_with_context_start_indices = [], [], [], [] current_length = 0 current_index = 0 chunk = [] for word in words: if current_length + len(word) > chunk_len: chunks.append(''.join(chunk)) chunk_lengths.append(current_length) chunk_start_indices.append(current_index - current_length) chunk = [word] current_length = len(word) else: chunk.append(word) current_length += len(word) current_index += len(word) if chunk: chunks.append(''.join(chunk)) chunk_lengths.append(current_length) chunk_start_indices.append(current_index - current_length) chunks_with_context = [] for start_index, chunk_length in zip(chunk_start_indices, chunk_lengths): context_start_index = bisect.bisect_right(word_start_indices, start_index - context_left) context_end_index = bisect.bisect_left(word_start_indices, start_index + chunk_length + context_right) # Combine all the words in the context range (before, chunk, and after) chunk_with_context = ''.join(words[context_start_index:context_end_index]) chunks_with_context.append(chunk_with_context) # Determine the start index of the chunk with context chunk_with_context_start_index = word_start_indices[context_start_index] chunk_with_context_start_indices.append(chunk_with_context_start_index) return chunks, chunks_with_context, chunk_with_context_start_indices def _clear_chunks(data_chunks, data_chunks_with_context, data_chunk_starting_indices): distinct_data_chunks = [] distinct_data_chunks_with_context = [] distinct_data_chunk_starting_indices = [] seen_chunks = dict() for chunk, context, index in zip(data_chunks, data_chunks_with_context, data_chunk_starting_indices): # Skip the chunk if it does not contain any alphanumeric characters if not any(char.isalnum() for char in chunk): continue seen_chunk_start = seen_chunks.get(chunk) if seen_chunk_start: # If we've already seen this exact chunk, and the context around it it very close to the seen chunk, then skip it. if abs(seen_chunk_start-index) < parameters.get_delta_start(): continue distinct_data_chunks.append(chunk) distinct_data_chunks_with_context.append(context) distinct_data_chunk_starting_indices.append(index) seen_chunks[chunk] = index return distinct_data_chunks, distinct_data_chunks_with_context, distinct_data_chunk_starting_indices def process_and_add_to_collector(corpus: str, collector: ChromaCollector, clear_collector_before_adding: bool, metadata: dict): # Defining variables chunk_lens = [int(len.strip()) for len in parameters.get_chunk_len().split(',')] context_len = [int(len.strip()) for len in parameters.get_context_len().split(',')] if len(context_len) >= 3: raise f"Context len has too many values: {len(context_len)}" if len(context_len) == 2: context_left = context_len[0] context_right = context_len[1] else: context_left = context_right = context_len[0] data_chunks = [] data_chunks_with_context = [] data_chunk_starting_indices = [] # Handling chunk_regex if parameters.get_chunk_regex(): if parameters.get_chunk_separator(): cumulative_length = 0 # This variable will store the length of the processed corpus sections = corpus.split(parameters.get_chunk_separator()) for section in sections: special_chunks = list(re.finditer(parameters.get_chunk_regex(), section)) for match in special_chunks: chunk = match.group(0) start_index = match.start() end_index = start_index + len(chunk) context = section[max(0, start_index - context_left):min(len(section), end_index + context_right)] data_chunks.append(chunk) data_chunks_with_context.append(context) data_chunk_starting_indices.append(cumulative_length + max(0, start_index - context_left)) cumulative_length += len(section) + len(parameters.get_chunk_separator()) # Update the length of the processed corpus else: special_chunks = list(re.finditer(parameters.get_chunk_regex(), corpus)) for match in special_chunks: chunk = match.group(0) start_index = match.start() end_index = start_index + len(chunk) context = corpus[max(0, start_index - context_left):min(len(corpus), end_index + context_right)] data_chunks.append(chunk) data_chunks_with_context.append(context) data_chunk_starting_indices.append(max(0, start_index - context_left)) for chunk_len in chunk_lens: # Breaking the data into chunks and adding those to the db if parameters.get_chunk_separator(): cumulative_length = 0 # This variable will store the length of the processed corpus sections = corpus.split(parameters.get_chunk_separator()) for section in sections: chunks, chunks_with_context, context_start_indices = _create_chunks_with_context(section, chunk_len, context_left, context_right) context_start_indices = [cumulative_length + i for i in context_start_indices] # Add the length of the processed corpus to each start index data_chunks.extend(chunks) data_chunks_with_context.extend(chunks_with_context) data_chunk_starting_indices.extend(context_start_indices) cumulative_length += len(section) + len(parameters.get_chunk_separator()) # Update the length of the processed corpus else: chunks, chunks_with_context, context_start_indices = _create_chunks_with_context(corpus, chunk_len, context_left, context_right) data_chunks.extend(chunks) data_chunks_with_context.extend(chunks_with_context) data_chunk_starting_indices.extend(context_start_indices) data_chunks = [preprocess_text_no_summary(chunk) for chunk in data_chunks] data_chunks, data_chunks_with_context, data_chunk_starting_indices = _clear_chunks( data_chunks, data_chunks_with_context, data_chunk_starting_indices ) if clear_collector_before_adding: collector.clear() collector.add(data_chunks, data_chunks_with_context, data_chunk_starting_indices, [metadata]*len(data_chunks) if metadata is not None else None)