from langchain.text_splitter import TextSplitter from langchain.schema import Document class StructureAwareTextSplitter(TextSplitter): """ A custom text splitter that creates context-aware document chunks from structured HTML content. This splitter buffers paragraphs, lists, and tables together into chunks up to a specified size, preserving section headers and content structure. Tables are combined with surrounding content when possible, but split into their own chunk if too large. Useful for web page or wiki-style content where structure and context are important for downstream retrieval or LLM tasks. Args: chunk_size (int): Maximum number of words per chunk. chunk_overlap (int): Number of words to overlap between chunks (not currently used). Methods: split_text(text): Dummy implementation to satisfy the abstract base class. split_documents(structured_blocks, metadata=None): Splits structured content blocks into Document objects with preserved section headers and types. """ def __init__(self, chunk_size=500, chunk_overlap=50): super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap) #TODO: To be implemented def split_text(self, text): # Dummy implementation to satisfy the abstract base class return [text] def split_documents(self, structured_blocks, metadata=None): current_chunk = "" current_words_cnt = 0 current_header = "" documents = [] def add_document(content, header, type_): documents.append(Document( page_content=content.strip(), metadata={ "section_header": header, "type": type_, **(metadata or {}) } )) for block in structured_blocks: type_ = block['type'] if type_ == 'header': current_header = block['text'] elif type_ in ['paragraph', 'list']: if type_ == 'paragraph': text = block['text'] else: # list text = "\n".join(block['items']) + "\n" words_cnt = len(text.split()) if current_words_cnt + words_cnt <= self._chunk_size: current_chunk += text + "\n" current_words_cnt += words_cnt else: add_document(f"{current_header}\n\n{current_chunk}", current_header, type_) current_chunk = text + "\n" current_words_cnt = words_cnt elif type_ == 'table': table_text = f"{current_header} [Table]\n\n{block['text']}\n" words_cnt = len(table_text.split()) # Try to buffer table with current chunk if possible if current_words_cnt + words_cnt <= self._chunk_size: current_chunk += table_text current_words_cnt += words_cnt else: # If current_chunk is not empty, flush it first if current_chunk.strip(): add_document(f"{current_header}\n\n{current_chunk}", current_header, 'mixed') # If table itself is too big, split it alone if words_cnt > self._chunk_size: add_document(table_text, current_header, 'table') current_chunk = "" current_words_cnt = 0 else: current_chunk = table_text current_words_cnt = words_cnt elif type_ == 'span': text = block['text'] words_cnt = len(text.split()) if current_words_cnt + words_cnt <= self._chunk_size: current_chunk += text + "\n" current_words_cnt += words_cnt else: add_document(f"{current_header}\n\n{current_chunk}", current_header, 'mixed') current_chunk = text + "\n" current_words_cnt = words_cnt if current_chunk.strip(): add_document(f"{current_header}\n\n{current_chunk}", current_header, 'mixed') return documents