Spaces:
Running
Running
File size: 4,837 Bytes
4717959 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import json
import os
from bs4 import BeautifulSoup
import logging
from typing import List, Dict, Any
from haystack.components.preprocessors.document_splitter import DocumentSplitter
from haystack import Document
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_json_data(file_path: str) -> List[Dict[str, str]]:
"""
Load data from a JSON file.
Args:
file_path: Path to the JSON file
Returns:
List of dictionaries containing the data
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.info(f"Successfully loaded {len(data)} records from {file_path}")
return data
except Exception as e:
logger.error(f"Error loading JSON data: {e}")
return []
def extract_text_from_html(html_content: str) -> str:
"""
Extract text content from HTML.
Args:
html_content: HTML content as string
Returns:
Extracted text content
"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.extract()
# Get text
text = soup.get_text(separator=' ', strip=True)
# Remove extra whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text
except Exception as e:
logger.error(f"Error extracting text from HTML: {e}")
return ""
def process_documents(data: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""
Process documents from the dataset.
Args:
data: List of dictionaries containing url and html fields
Returns:
List of processed documents with text content
"""
processed_docs = []
for i, item in enumerate(data):
try:
url = item.get('url', '')
content = item.get('content', '')
if not url or not content:
continue
# text = extract_text_from_html(html)
# text = html
# if not text:
# continue
# Create document with metadata
doc = {
'content': content,
'meta': {
'url': url,
'doc_id': f"doc_{i}"
}
}
processed_docs.append(doc)
except Exception as e:
logger.error(f"Error processing document {i}: {e}")
logger.info(f"Successfully processed {len(processed_docs)} documents")
return processed_docs
def split_documents(docs: List[Dict[str, Any]], chunk_size: int = 500, overlap: int = 50) -> List[Dict[str, Any]]:
"""
Split documents into smaller chunks for better retrieval using Haystack.
Args:
docs: List of processed documents
chunk_size: Size of each chunk in characters
overlap: Overlap between chunks in characters
Returns:
List of document chunks
"""
# Initialize Haystack document splitter
document_splitter = DocumentSplitter(
# split_by="character",
split_length=chunk_size,
split_overlap=overlap
)
chunked_docs = []
for doc in docs:
# If content is shorter than chunk_size, keep as is
if len(doc['content']) <= chunk_size:
chunked_docs.append(doc)
continue
# Prepare document for Haystack splitter
haystack_doc = Document(
content=doc['content'],
meta=doc['meta']
)
# Split the document
result = document_splitter.run(documents=[haystack_doc])
split_docs = result["documents"]
# Update document IDs for the chunks
for i, split_doc in enumerate(split_docs):
split_doc.meta["doc_id"] = f"{doc['meta']['doc_id']}_chunk_{i}"
split_doc.meta["chunk_id"] = i
chunked_docs.append(split_doc)
logger.info(f"Split {len(docs)} documents into {len(chunked_docs)} chunks")
return chunked_docs
if __name__ == "__main__":
# Test the functions
data_path = "ltu_programme_data.json"
if os.path.exists(data_path):
data = load_json_data(data_path)
processed_docs = process_documents(data[:5]) # Process first 5 docs as a test
chunked_docs = split_documents(processed_docs)
print(f"Processed {len(processed_docs)} documents into {len(chunked_docs)} chunks") |