| """
|
| Content Parsing Module
|
| Handles extraction of content from PDFs, text, and webpages
|
| """
|
|
|
| import requests
|
| from bs4 import BeautifulSoup
|
| from urllib.parse import urljoin, urlparse
|
| from typing import List, Dict, Any
|
| import time
|
| from langchain_community.document_loaders import PyPDFLoader
|
| from langchain.schema import Document
|
|
|
|
|
| class BaseParser:
|
| """Base class for all content parsers"""
|
|
|
| def __init__(self):
|
| self.supported_formats = []
|
|
|
| def parse(self, source: str) -> List[Document]:
|
| """Parse content from source and return LangChain Documents"""
|
| raise NotImplementedError("Subclasses must implement parse method")
|
|
|
| def validate_source(self, source: str) -> bool:
|
| """Validate if the source can be processed"""
|
| return True
|
|
|
|
|
| class PDFParser(BaseParser):
|
| """Parser for PDF documents"""
|
|
|
| def __init__(self):
|
| super().__init__()
|
| self.supported_formats = ['.pdf']
|
|
|
| def parse(self, pdf_path: str) -> List[Document]:
|
| """
|
| Parse PDF file and return list of Document objects
|
|
|
| Args:
|
| pdf_path (str): Path to the PDF file
|
|
|
| Returns:
|
| List[Document]: List of parsed documents with metadata
|
| """
|
| try:
|
| loader = PyPDFLoader(pdf_path)
|
| documents = loader.load_and_split()
|
|
|
|
|
| for i, doc in enumerate(documents):
|
| doc.metadata.update({
|
| 'source_type': 'pdf',
|
| 'page_number': i + 1,
|
| 'total_pages': len(documents),
|
| 'parser': 'PDFParser'
|
| })
|
|
|
| return documents
|
|
|
| except Exception as e:
|
| raise Exception(f"Error parsing PDF: {str(e)}")
|
|
|
| def get_pdf_metadata(self, pdf_path: str) -> Dict[str, Any]:
|
| """Extract metadata from PDF file"""
|
| try:
|
| loader = PyPDFLoader(pdf_path)
|
| documents = loader.load()
|
|
|
| total_pages = len(documents)
|
| total_words = sum(len(doc.page_content.split()) for doc in documents)
|
|
|
| return {
|
| 'total_pages': total_pages,
|
| 'total_words': total_words,
|
| 'average_words_per_page': total_words / total_pages if total_pages > 0 else 0,
|
| 'file_type': 'PDF',
|
| 'parser_used': 'PyPDFLoader'
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Could not extract metadata: {str(e)}"}
|
|
|
|
|
| class TextParser(BaseParser):
|
| """Parser for plain text content"""
|
|
|
| def __init__(self):
|
| super().__init__()
|
| self.supported_formats = ['.txt', 'plain_text']
|
| self.chunk_size = 1000
|
|
|
| def parse(self, text_content: str, chunk_size: int = None) -> List[Document]:
|
| """
|
| Parse text content and return list of Document objects
|
|
|
| Args:
|
| text_content (str): Raw text content
|
| chunk_size (int): Optional chunk size for splitting long texts
|
|
|
| Returns:
|
| List[Document]: List of documents, potentially chunked
|
| """
|
| try:
|
| if not text_content.strip():
|
| raise ValueError("Empty text content provided")
|
|
|
| chunk_size = chunk_size or self.chunk_size
|
|
|
|
|
| if len(text_content) <= chunk_size:
|
| doc = Document(
|
| page_content=text_content,
|
| metadata={
|
| 'source_type': 'text',
|
| 'word_count': len(text_content.split()),
|
| 'char_count': len(text_content),
|
| 'chunk_index': 0,
|
| 'total_chunks': 1,
|
| 'parser': 'TextParser'
|
| }
|
| )
|
| return [doc]
|
|
|
|
|
| chunks = self._split_text_into_chunks(text_content, chunk_size)
|
| documents = []
|
|
|
| for i, chunk in enumerate(chunks):
|
| doc = Document(
|
| page_content=chunk,
|
| metadata={
|
| 'source_type': 'text',
|
| 'word_count': len(chunk.split()),
|
| 'char_count': len(chunk),
|
| 'chunk_index': i,
|
| 'total_chunks': len(chunks),
|
| 'parser': 'TextParser'
|
| }
|
| )
|
| documents.append(doc)
|
|
|
| return documents
|
|
|
| except Exception as e:
|
| raise Exception(f"Error parsing text: {str(e)}")
|
|
|
| def _split_text_into_chunks(self, text: str, chunk_size: int) -> List[str]:
|
| """Split text into chunks while preserving sentence boundaries"""
|
| sentences = text.split('. ')
|
| chunks = []
|
| current_chunk = ""
|
|
|
| for sentence in sentences:
|
|
|
| test_chunk = current_chunk + sentence + ". "
|
|
|
| if len(test_chunk) <= chunk_size:
|
| current_chunk = test_chunk
|
| else:
|
|
|
| if current_chunk.strip():
|
| chunks.append(current_chunk.strip())
|
| current_chunk = sentence + ". "
|
|
|
|
|
| if current_chunk.strip():
|
| chunks.append(current_chunk.strip())
|
|
|
| return chunks
|
|
|
| def analyze_text_structure(self, text_content: str) -> Dict[str, Any]:
|
| """Analyze the structure and characteristics of text content"""
|
| try:
|
| lines = text_content.split('\n')
|
| words = text_content.split()
|
| sentences = text_content.split('.')
|
|
|
|
|
| paragraphs = [p.strip() for p in text_content.split('\n\n') if p.strip()]
|
|
|
| return {
|
| 'total_words': len(words),
|
| 'total_sentences': len([s for s in sentences if s.strip()]),
|
| 'total_lines': len(lines),
|
| 'total_paragraphs': len(paragraphs),
|
| 'average_words_per_sentence': len(words) / len(sentences) if sentences else 0,
|
| 'average_sentences_per_paragraph': len(sentences) / len(paragraphs) if paragraphs else 0,
|
| 'character_count': len(text_content),
|
| 'reading_time_minutes': len(words) / 200,
|
| 'complexity_score': self._calculate_text_complexity(text_content)
|
| }
|
|
|
| except Exception as e:
|
| return {'error': f"Could not analyze text structure: {str(e)}"}
|
|
|
| def _calculate_text_complexity(self, text: str) -> float:
|
| """Calculate a simple text complexity score"""
|
| words = text.split()
|
| sentences = [s for s in text.split('.') if s.strip()]
|
|
|
| if not sentences:
|
| return 0.0
|
|
|
|
|
| avg_words_per_sentence = len(words) / len(sentences)
|
|
|
|
|
| avg_chars_per_word = sum(len(word) for word in words) / len(words) if words else 0
|
|
|
|
|
| complexity = (avg_words_per_sentence * 0.1) + (avg_chars_per_word * 0.5)
|
| return min(complexity, 10.0)
|
|
|
|
|
| class WebpageParser(BaseParser):
|
| """Parser for web content"""
|
|
|
| def __init__(self):
|
| super().__init__()
|
| self.supported_formats = ['http', 'https']
|
| self.headers = {
|
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
| }
|
| self.timeout = 10
|
| self.max_retries = 3
|
|
|
| def parse_website(self, url: str, max_pages: int = 1, include_subpages: bool = False) -> List[Dict[str, Any]]:
|
| """
|
| Parse website content and return structured data
|
|
|
| Args:
|
| url (str): Website URL to parse
|
| max_pages (int): Maximum number of pages to parse
|
| include_subpages (bool): Whether to include subpages
|
|
|
| Returns:
|
| List[Dict]: List of page data with content and metadata
|
| """
|
| try:
|
| pages_data = []
|
| urls_to_process = [url]
|
| processed_urls = set()
|
|
|
|
|
| if include_subpages and max_pages > 1:
|
| subpage_urls = self._find_subpages(url, max_pages - 1)
|
| urls_to_process.extend(subpage_urls)
|
|
|
|
|
| for current_url in urls_to_process[:max_pages]:
|
| if current_url in processed_urls:
|
| continue
|
|
|
| page_data = self._parse_single_page(current_url)
|
| if page_data:
|
| pages_data.append(page_data)
|
| processed_urls.add(current_url)
|
|
|
|
|
| time.sleep(1)
|
|
|
| return pages_data
|
|
|
| except Exception as e:
|
| raise Exception(f"Error parsing website: {str(e)}")
|
|
|
| def _parse_single_page(self, url: str) -> Dict[str, Any]:
|
| """Parse a single webpage and extract content"""
|
| try:
|
|
|
| response = None
|
| for attempt in range(self.max_retries):
|
| try:
|
| response = requests.get(url, headers=self.headers, timeout=self.timeout)
|
| response.raise_for_status()
|
| break
|
| except requests.RequestException as e:
|
| if attempt == self.max_retries - 1:
|
| raise e
|
| time.sleep(2 ** attempt)
|
|
|
| if not response:
|
| return None
|
|
|
|
|
| soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
|
| for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']):
|
| element.decompose()
|
|
|
|
|
| main_content = self._extract_main_content(soup)
|
|
|
|
|
| title = self._extract_title(soup)
|
| description = self._extract_description(soup)
|
| headings = self._extract_headings(soup)
|
| links = self._extract_links(soup, url)
|
|
|
|
|
| cleaned_text = self._clean_text_content(main_content)
|
|
|
| return {
|
| 'url': url,
|
| 'title': title,
|
| 'description': description,
|
| 'content': cleaned_text,
|
| 'headings': headings,
|
| 'internal_links': links['internal'],
|
| 'external_links': links['external'],
|
| 'word_count': len(cleaned_text.split()),
|
| 'char_count': len(cleaned_text),
|
| 'meta_keywords': self._extract_meta_keywords(soup),
|
| 'images': self._extract_images(soup, url),
|
| 'parser': 'WebpageParser',
|
| 'parsed_at': time.strftime('%Y-%m-%d %H:%M:%S')
|
| }
|
|
|
| except Exception as e:
|
| return {'url': url, 'error': f"Failed to parse page: {str(e)}"}
|
|
|
| def _extract_main_content(self, soup: BeautifulSoup) -> str:
|
| """Extract the main content from the page"""
|
|
|
| content_selectors = [
|
| 'main',
|
| 'article',
|
| '[role="main"]',
|
| '.content',
|
| '.main-content',
|
| '#content',
|
| '#main',
|
| '.post-content',
|
| '.entry-content'
|
| ]
|
|
|
| for selector in content_selectors:
|
| element = soup.select_one(selector)
|
| if element:
|
| return element.get_text(separator=' ', strip=True)
|
|
|
|
|
| body = soup.find('body')
|
| if body:
|
| return body.get_text(separator=' ', strip=True)
|
|
|
| return soup.get_text(separator=' ', strip=True)
|
|
|
| def _extract_title(self, soup: BeautifulSoup) -> str:
|
| """Extract page title"""
|
| title_tag = soup.find('title')
|
| if title_tag:
|
| return title_tag.get_text().strip()
|
|
|
|
|
| h1 = soup.find('h1')
|
| if h1:
|
| return h1.get_text().strip()
|
|
|
| return "No Title Found"
|
|
|
| def _extract_description(self, soup: BeautifulSoup) -> str:
|
| """Extract meta description"""
|
| meta_desc = soup.find('meta', attrs={'name': 'description'})
|
| if meta_desc and meta_desc.get('content'):
|
| return meta_desc['content'].strip()
|
|
|
|
|
| og_desc = soup.find('meta', attrs={'property': 'og:description'})
|
| if og_desc and og_desc.get('content'):
|
| return og_desc['content'].strip()
|
|
|
| return "No Description Found"
|
|
|
| def _extract_headings(self, soup: BeautifulSoup) -> List[Dict[str, Any]]:
|
| """Extract all headings with their hierarchy"""
|
| headings = []
|
|
|
| for i in range(1, 7):
|
| for heading in soup.find_all(f'h{i}'):
|
| text = heading.get_text(strip=True)
|
| if text:
|
| headings.append({
|
| 'level': i,
|
| 'text': text,
|
| 'id': heading.get('id', ''),
|
| 'class': heading.get('class', [])
|
| })
|
|
|
| return headings
|
|
|
| def _extract_links(self, soup: BeautifulSoup, base_url: str) -> Dict[str, List[str]]:
|
| """Extract internal and external links"""
|
| internal_links = []
|
| external_links = []
|
| base_domain = urlparse(base_url).netloc
|
|
|
| for link in soup.find_all('a', href=True):
|
| href = link['href']
|
| full_url = urljoin(base_url, href)
|
| parsed_url = urlparse(full_url)
|
|
|
| if parsed_url.netloc == base_domain:
|
| internal_links.append(full_url)
|
| elif parsed_url.netloc:
|
| external_links.append(full_url)
|
|
|
| return {
|
| 'internal': list(set(internal_links)),
|
| 'external': list(set(external_links))
|
| }
|
|
|
| def _extract_meta_keywords(self, soup: BeautifulSoup) -> List[str]:
|
| """Extract meta keywords if available"""
|
| meta_keywords = soup.find('meta', attrs={'name': 'keywords'})
|
| if meta_keywords and meta_keywords.get('content'):
|
| keywords = meta_keywords['content'].split(',')
|
| return [kw.strip() for kw in keywords if kw.strip()]
|
| return []
|
|
|
| def _extract_images(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, str]]:
|
| """Extract image information"""
|
| images = []
|
|
|
| for img in soup.find_all('img'):
|
| src = img.get('src')
|
| if src:
|
| full_url = urljoin(base_url, src)
|
| images.append({
|
| 'src': full_url,
|
| 'alt': img.get('alt', ''),
|
| 'title': img.get('title', '')
|
| })
|
|
|
| return images
|
|
|
| def _clean_text_content(self, text: str) -> str:
|
| """Clean and normalize text content"""
|
| if not text:
|
| return ""
|
|
|
|
|
| lines = text.split('\n')
|
| cleaned_lines = []
|
|
|
| for line in lines:
|
| line = line.strip()
|
| if line and len(line) > 1:
|
| cleaned_lines.append(line)
|
|
|
|
|
| cleaned_text = ' '.join(cleaned_lines)
|
|
|
|
|
| while ' ' in cleaned_text:
|
| cleaned_text = cleaned_text.replace(' ', ' ')
|
|
|
| return cleaned_text
|
|
|
| def _find_subpages(self, url: str, max_subpages: int) -> List[str]:
|
| """Find subpages from the main page"""
|
| try:
|
| response = requests.get(url, headers=self.headers, timeout=self.timeout)
|
| response.raise_for_status()
|
|
|
| soup = BeautifulSoup(response.content, 'html.parser')
|
| base_domain = urlparse(url).netloc
|
| subpages = set()
|
|
|
|
|
| for link in soup.find_all('a', href=True):
|
| href = link['href']
|
| full_url = urljoin(url, href)
|
| parsed_url = urlparse(full_url)
|
|
|
|
|
| if (parsed_url.netloc == base_domain and
|
| full_url != url and
|
| not any(ext in full_url.lower() for ext in ['.pdf', '.jpg', '.png', '.gif', '.zip'])):
|
| subpages.add(full_url)
|
|
|
| if len(subpages) >= max_subpages:
|
| break
|
|
|
| return list(subpages)[:max_subpages]
|
|
|
| except Exception:
|
| return []
|
|
|
| def validate_url(self, url: str) -> bool:
|
| """Validate if URL is accessible"""
|
| try:
|
| response = requests.head(url, headers=self.headers, timeout=5)
|
| return response.status_code == 200
|
| except:
|
| return False
|
|
|
| def get_website_info(self, url: str) -> Dict[str, Any]:
|
| """Get basic information about a website"""
|
| try:
|
| response = requests.get(url, headers=self.headers, timeout=self.timeout)
|
| response.raise_for_status()
|
|
|
| soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
| return {
|
| 'url': url,
|
| 'title': self._extract_title(soup),
|
| 'description': self._extract_description(soup),
|
| 'meta_keywords': self._extract_meta_keywords(soup),
|
| 'has_robots_meta': bool(soup.find('meta', attrs={'name': 'robots'})),
|
| 'has_viewport_meta': bool(soup.find('meta', attrs={'name': 'viewport'})),
|
| 'language': soup.get('lang', 'unknown'),
|
| 'status_code': response.status_code,
|
| 'content_type': response.headers.get('content-type', 'unknown'),
|
| 'server': response.headers.get('server', 'unknown')
|
| }
|
|
|
| except Exception as e:
|
| return {'url': url, 'error': f"Could not get website info: {str(e)}"}
|
|
|
|
|
| class ParserFactory:
|
| """Factory class to create appropriate parsers"""
|
|
|
| @staticmethod
|
| def get_parser(source_type: str):
|
| """Get the appropriate parser for the source type"""
|
| parsers = {
|
| 'pdf': PDFParser(),
|
| 'text': TextParser(),
|
| 'webpage': WebpageParser(),
|
| 'url': WebpageParser()
|
| }
|
|
|
| return parsers.get(source_type.lower())
|
|
|
| @staticmethod
|
| def detect_source_type(source: str) -> str:
|
| """Detect the type of content source"""
|
| if source.startswith(('http://', 'https://')):
|
| return 'webpage'
|
| elif source.endswith('.pdf'):
|
| return 'pdf'
|
| else:
|
| return 'text' |