|
import os |
|
import json |
|
import re |
|
from typing import List, Dict, Any, Optional |
|
import pickle |
|
from tqdm import tqdm |
|
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
class DocumentChunker: |
|
def __init__(self, input_dir: str = "data/raw", |
|
output_dir: str = "data/processed", |
|
embedding_dir: str = "data/embeddings", |
|
model_name: str = "BAAI/bge-small-en-v1.5"): |
|
self.input_dir = input_dir |
|
self.output_dir = output_dir |
|
self.embedding_dir = embedding_dir |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
os.makedirs(embedding_dir, exist_ok=True) |
|
|
|
|
|
self.model = SentenceTransformer(model_name) |
|
|
|
def load_documents(self) -> List[Dict[str, Any]]: |
|
"""Load all documents from the input directory.""" |
|
documents = [] |
|
|
|
for filename in os.listdir(self.input_dir): |
|
if filename.endswith('.json'): |
|
filepath = os.path.join(self.input_dir, filename) |
|
with open(filepath, 'r') as f: |
|
document = json.load(f) |
|
documents.append(document) |
|
|
|
return documents |
|
|
|
def chunk_by_headings(self, document: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
"""Split document into chunks based on headings.""" |
|
chunks = [] |
|
|
|
|
|
if not document.get('headings'): |
|
chunk = { |
|
'title': document['title'], |
|
'content': document['content'], |
|
'url': document['url'], |
|
'categories': document.get('categories', []), |
|
'scraped_at': document['scraped_at'], |
|
'document_type': document.get('document_type', 'webpage') |
|
} |
|
chunks.append(chunk) |
|
return chunks |
|
|
|
|
|
headings = sorted(document['headings'], key=lambda h: h.get('level', 6)) |
|
content = document['content'] |
|
|
|
|
|
current_title = document['title'] |
|
current_content = "" |
|
content_lines = content.split('\n') |
|
line_index = 0 |
|
|
|
for heading in headings: |
|
heading_text = heading['text'] |
|
|
|
|
|
heading_found = False |
|
for i in range(line_index, len(content_lines)): |
|
if heading_text in content_lines[i]: |
|
|
|
if current_content.strip(): |
|
chunk = { |
|
'title': current_title, |
|
'content': current_content.strip(), |
|
'url': document['url'], |
|
'categories': document.get('categories', []), |
|
'scraped_at': document['scraped_at'], |
|
'document_type': document.get('document_type', 'webpage') |
|
} |
|
chunks.append(chunk) |
|
|
|
|
|
current_title = heading_text |
|
current_content = "" |
|
line_index = i + 1 |
|
heading_found = True |
|
break |
|
|
|
if not heading_found: |
|
current_content += heading_text + "\n" |
|
|
|
|
|
if line_index < len(content_lines): |
|
for i in range(line_index, len(content_lines)): |
|
|
|
if any(h['text'] in content_lines[i] for h in headings if h['text'] != heading_text): |
|
break |
|
current_content += content_lines[i] + "\n" |
|
line_index = i + 1 |
|
|
|
|
|
if current_content.strip(): |
|
chunk = { |
|
'title': current_title, |
|
'content': current_content.strip(), |
|
'url': document['url'], |
|
'categories': document.get('categories', []), |
|
'scraped_at': document['scraped_at'], |
|
'document_type': document.get('document_type', 'webpage') |
|
} |
|
chunks.append(chunk) |
|
|
|
return chunks |
|
|
|
def chunk_faqs(self, document: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
"""Extract FAQs as individual chunks.""" |
|
chunks = [] |
|
|
|
if not document.get('faqs'): |
|
return chunks |
|
|
|
for faq in document['faqs']: |
|
chunk = { |
|
'title': faq['question'], |
|
'content': faq['answer'], |
|
'url': document['url'], |
|
'categories': document.get('categories', []), |
|
'scraped_at': document['scraped_at'], |
|
'document_type': 'faq', |
|
'question': faq['question'] |
|
} |
|
chunks.append(chunk) |
|
|
|
return chunks |
|
|
|
def chunk_semantically(self, document: Dict[str, Any], |
|
max_chunk_size: int = 1000, |
|
overlap: int = 100) -> List[Dict[str, Any]]: |
|
"""Split document into fixed-size chunks with overlap.""" |
|
chunks = [] |
|
content = document['content'] |
|
|
|
|
|
if not content.strip(): |
|
return chunks |
|
|
|
|
|
paragraphs = re.split(r'\n\s*\n', content) |
|
|
|
current_chunk = "" |
|
current_length = 0 |
|
|
|
for para in paragraphs: |
|
para = para.strip() |
|
if not para: |
|
continue |
|
|
|
para_length = len(para) |
|
|
|
|
|
if para_length > max_chunk_size: |
|
sentences = re.split(r'(?<=[.!?])\s+', para) |
|
for sentence in sentences: |
|
sentence = sentence.strip() |
|
sentence_length = len(sentence) |
|
|
|
if current_length + sentence_length <= max_chunk_size: |
|
current_chunk += sentence + " " |
|
current_length += sentence_length + 1 |
|
else: |
|
|
|
if current_chunk: |
|
chunk = { |
|
'title': document['title'], |
|
'content': current_chunk.strip(), |
|
'url': document['url'], |
|
'categories': document.get('categories', []), |
|
'scraped_at': document['scraped_at'], |
|
'document_type': document.get('document_type', 'webpage') |
|
} |
|
chunks.append(chunk) |
|
|
|
|
|
current_chunk = sentence + " " |
|
current_length = sentence_length + 1 |
|
|
|
|
|
elif current_length + para_length <= max_chunk_size: |
|
current_chunk += para + "\n\n" |
|
current_length += para_length + 2 |
|
|
|
|
|
else: |
|
|
|
if current_chunk: |
|
chunk = { |
|
'title': document['title'], |
|
'content': current_chunk.strip(), |
|
'url': document['url'], |
|
'categories': document.get('categories', []), |
|
'scraped_at': document['scraped_at'], |
|
'document_type': document.get('document_type', 'webpage') |
|
} |
|
chunks.append(chunk) |
|
|
|
|
|
current_chunk = para + "\n\n" |
|
current_length = para_length + 2 |
|
|
|
|
|
if current_chunk: |
|
chunk = { |
|
'title': document['title'], |
|
'content': current_chunk.strip(), |
|
'url': document['url'], |
|
'categories': document.get('categories', []), |
|
'scraped_at': document['scraped_at'], |
|
'document_type': document.get('document_type', 'webpage') |
|
} |
|
chunks.append(chunk) |
|
|
|
return chunks |
|
|
|
def create_chunks(self) -> List[Dict[str, Any]]: |
|
"""Process all documents and create chunks.""" |
|
all_chunks = [] |
|
|
|
|
|
documents = self.load_documents() |
|
print(f"Loaded {len(documents)} documents") |
|
|
|
|
|
for document in tqdm(documents, desc="Chunking documents"): |
|
|
|
faq_chunks = self.chunk_faqs(document) |
|
all_chunks.extend(faq_chunks) |
|
|
|
|
|
heading_chunks = self.chunk_by_headings(document) |
|
all_chunks.extend(heading_chunks) |
|
|
|
|
|
if not heading_chunks: |
|
semantic_chunks = self.chunk_semantically(document) |
|
all_chunks.extend(semantic_chunks) |
|
|
|
|
|
with open(os.path.join(self.output_dir, 'chunks.json'), 'w') as f: |
|
json.dump(all_chunks, f, indent=2) |
|
|
|
print(f"Created {len(all_chunks)} chunks") |
|
return all_chunks |
|
|
|
def create_embeddings(self, chunks: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: |
|
"""Create embeddings for all chunks.""" |
|
if chunks is None: |
|
|
|
chunks_path = os.path.join(self.output_dir, 'chunks.json') |
|
if os.path.exists(chunks_path): |
|
with open(chunks_path, 'r') as f: |
|
chunks = json.load(f) |
|
else: |
|
chunks = self.create_chunks() |
|
|
|
|
|
texts = [] |
|
for chunk in chunks: |
|
|
|
if chunk.get('document_type') == 'faq': |
|
text = f"{chunk['title']} {chunk['content']}" |
|
else: |
|
|
|
text = f"{chunk['title']} {chunk['content']}" |
|
texts.append(text) |
|
|
|
|
|
print("Creating embeddings...") |
|
embeddings = self.model.encode(texts, show_progress_bar=True) |
|
|
|
|
|
embedding_map = {} |
|
for i, chunk in enumerate(chunks): |
|
chunk_id = f"chunk_{i}" |
|
embedding_map[chunk_id] = { |
|
'embedding': embeddings[i], |
|
'chunk': chunk |
|
} |
|
|
|
|
|
with open(os.path.join(self.embedding_dir, 'embeddings.pkl'), 'wb') as f: |
|
pickle.dump(embedding_map, f) |
|
|
|
print(f"Created embeddings for {len(chunks)} chunks") |
|
return embedding_map |
|
|
|
|
|
if __name__ == "__main__": |
|
chunker = DocumentChunker() |
|
chunks = chunker.create_chunks() |
|
embedding_map = chunker.create_embeddings(chunks) |