Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from bs4 import BeautifulSoup | |
| import logging | |
| from typing import List, Dict, Any | |
| from haystack.components.preprocessors.document_splitter import DocumentSplitter | |
| from haystack import Document | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def load_json_data(file_path: str) -> List[Dict[str, str]]: | |
| """ | |
| Load data from a JSON file. | |
| Args: | |
| file_path: Path to the JSON file | |
| Returns: | |
| List of dictionaries containing the data | |
| """ | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| logger.info(f"Successfully loaded {len(data)} records from {file_path}") | |
| return data | |
| except Exception as e: | |
| logger.error(f"Error loading JSON data: {e}") | |
| return [] | |
| def extract_text_from_html(html_content: str) -> str: | |
| """ | |
| Extract text content from HTML. | |
| Args: | |
| html_content: HTML content as string | |
| Returns: | |
| Extracted text content | |
| """ | |
| try: | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.extract() | |
| # Get text | |
| text = soup.get_text(separator=' ', strip=True) | |
| # Remove extra whitespace | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = ' '.join(chunk for chunk in chunks if chunk) | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from HTML: {e}") | |
| return "" | |
| def process_documents(data: List[Dict[str, str]]) -> List[Dict[str, Any]]: | |
| """ | |
| Process documents from the dataset. | |
| Args: | |
| data: List of dictionaries containing url and html fields | |
| Returns: | |
| List of processed documents with text content | |
| """ | |
| processed_docs = [] | |
| for i, item in enumerate(data): | |
| try: | |
| url = item.get('url', '') | |
| content = item.get('content', '') | |
| if not url or not content: | |
| continue | |
| # text = extract_text_from_html(html) | |
| # text = html | |
| # if not text: | |
| # continue | |
| # Create document with metadata | |
| doc = { | |
| 'content': content, | |
| 'meta': { | |
| 'url': url, | |
| 'doc_id': f"doc_{i}" | |
| } | |
| } | |
| processed_docs.append(doc) | |
| except Exception as e: | |
| logger.error(f"Error processing document {i}: {e}") | |
| logger.info(f"Successfully processed {len(processed_docs)} documents") | |
| return processed_docs | |
| def split_documents(docs: List[Dict[str, Any]], chunk_size: int = 500, overlap: int = 50) -> List[Dict[str, Any]]: | |
| """ | |
| Split documents into smaller chunks for better retrieval using Haystack. | |
| Args: | |
| docs: List of processed documents | |
| chunk_size: Size of each chunk in characters | |
| overlap: Overlap between chunks in characters | |
| Returns: | |
| List of document chunks | |
| """ | |
| # Initialize Haystack document splitter | |
| document_splitter = DocumentSplitter( | |
| # split_by="character", | |
| split_length=chunk_size, | |
| split_overlap=overlap | |
| ) | |
| chunked_docs = [] | |
| for doc in docs: | |
| # If content is shorter than chunk_size, keep as is | |
| if len(doc['content']) <= chunk_size: | |
| chunked_docs.append(doc) | |
| continue | |
| # Prepare document for Haystack splitter | |
| haystack_doc = Document( | |
| content=doc['content'], | |
| meta=doc['meta'] | |
| ) | |
| # Split the document | |
| result = document_splitter.run(documents=[haystack_doc]) | |
| split_docs = result["documents"] | |
| # Update document IDs for the chunks | |
| for i, split_doc in enumerate(split_docs): | |
| split_doc.meta["doc_id"] = f"{doc['meta']['doc_id']}_chunk_{i}" | |
| split_doc.meta["chunk_id"] = i | |
| chunked_docs.append(split_doc) | |
| logger.info(f"Split {len(docs)} documents into {len(chunked_docs)} chunks") | |
| return chunked_docs | |
| if __name__ == "__main__": | |
| # Test the functions | |
| data_path = "ltu_programme_data.json" | |
| if os.path.exists(data_path): | |
| data = load_json_data(data_path) | |
| processed_docs = process_documents(data[:5]) # Process first 5 docs as a test | |
| chunked_docs = split_documents(processed_docs) | |
| print(f"Processed {len(processed_docs)} documents into {len(chunked_docs)} chunks") |