Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional, Tuple, Union | |
| import hashlib | |
| import time | |
| import streamlit as st | |
| from config import Config | |
| class BulletproofDocumentProcessor: | |
| """ | |
| Bulletproof PDF processor designed for maximum compatibility and reliability. | |
| This processor implements a multi-strategy extraction approach with intelligent | |
| fallbacks, avoiding complex dependencies while ensuring robust text extraction | |
| from diverse PDF formats commonly found in HR documentation. | |
| Architecture: | |
| - Primary: Native text extraction using minimal libraries | |
| - Secondary: Byte-level pattern matching for encoded content | |
| - Tertiary: Manual content stream parsing for complex PDFs | |
| - Fallback: User-guided content input for problematic files | |
| """ | |
| def __init__(self): | |
| self.config = Config() | |
| self.embedding_model = self._initialize_embedding_engine() | |
| self.extraction_stats = { | |
| 'attempts': 0, | |
| 'successes': 0, | |
| 'method_effectiveness': {} | |
| } | |
| def _initialize_embedding_engine(self): | |
| """ | |
| Initialize embedding engine with enhanced error handling and fallback mechanisms. | |
| This method implements a graceful degradation strategy, ensuring the system | |
| remains functional even if specific embedding libraries encounter issues. | |
| """ | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| # Use a more compatible model that's less likely to trigger torch issues | |
| model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') | |
| # Suppress torch warnings that don't affect functionality | |
| import warnings | |
| warnings.filterwarnings("ignore", message=".*torch.classes.*") | |
| return model | |
| except Exception as embedding_error: | |
| st.warning(f"Embedding model initialization issue: {str(embedding_error)}") | |
| st.info("π System will continue with basic functionality. Some features may be limited.") | |
| return None | |
| def extract_text_from_pdf(self, pdf_file) -> Optional[str]: | |
| """ | |
| Bulletproof PDF text extraction using progressive strategy escalation. | |
| This method implements a sophisticated extraction pipeline that adapts | |
| to different PDF types and encoding scenarios, ensuring maximum success | |
| rate across diverse document formats. | |
| Args: | |
| pdf_file: PDF file object or path | |
| Returns: | |
| Extracted text content or None if all methods fail | |
| """ | |
| self.extraction_stats['attempts'] += 1 | |
| # Define extraction strategies in order of preference and reliability | |
| extraction_strategies = [ | |
| ('PyPDF2_Enhanced', self._extract_pypdf2_enhanced), | |
| ('ByteLevel_Analysis', self._extract_byte_level), | |
| ('Pattern_Matching', self._extract_pattern_based), | |
| ('Manual_Parsing', self._extract_manual_streams) | |
| ] | |
| # Execute extraction strategies with comprehensive error handling | |
| for strategy_name, extraction_method in extraction_strategies: | |
| try: | |
| st.info(f"π Executing {strategy_name} extraction...") | |
| # Reset file pointer for each attempt | |
| self._reset_file_pointer(pdf_file) | |
| # Execute extraction with timeout protection | |
| extracted_text = self._execute_with_timeout( | |
| extraction_method, | |
| pdf_file, | |
| timeout_seconds=30 | |
| ) | |
| # Validate extraction quality | |
| if self._validate_extraction_quality(extracted_text): | |
| self._record_success(strategy_name) | |
| st.success(f"β {strategy_name} extraction successful!") | |
| return self._post_process_extracted_text(extracted_text) | |
| else: | |
| st.warning(f"β οΈ {strategy_name} extracted insufficient content") | |
| except Exception as strategy_error: | |
| st.warning(f"β οΈ {strategy_name} failed: {str(strategy_error)}") | |
| self._record_failure(strategy_name, str(strategy_error)) | |
| continue | |
| # All automated strategies failed - provide comprehensive guidance | |
| self._handle_extraction_failure(pdf_file) | |
| return None | |
| def _extract_pypdf2_enhanced(self, pdf_file) -> str: | |
| """ | |
| Enhanced PyPDF2 extraction with robust error handling and encoding management. | |
| This method implements intelligent PDF parsing that handles various | |
| encoding scenarios and structural anomalies commonly found in HR documents. | |
| """ | |
| try: | |
| import PyPDF2 | |
| # Prepare PDF reader with enhanced configuration | |
| pdf_data = self._read_pdf_data(pdf_file) | |
| # Create reader with multiple fallback configurations | |
| reader_configs = [ | |
| {'strict': False, 'password': None}, | |
| {'strict': True, 'password': None}, | |
| {'strict': False, 'password': ''} # Some PDFs have empty passwords | |
| ] | |
| pdf_reader = None | |
| for config in reader_configs: | |
| try: | |
| pdf_reader = PyPDF2.PdfReader( | |
| io.BytesIO(pdf_data), | |
| strict=config['strict'] | |
| ) | |
| if pdf_reader.is_encrypted and config['password'] is not None: | |
| pdf_reader.decrypt(config['password']) | |
| break | |
| except Exception: | |
| continue | |
| if not pdf_reader: | |
| raise Exception("Could not initialize PDF reader with any configuration") | |
| # Extract text with page-level error handling | |
| text_fragments = [] | |
| successful_pages = 0 | |
| for page_index, page in enumerate(pdf_reader.pages): | |
| try: | |
| # Multi-method text extraction per page | |
| page_text = self._extract_page_text_robust(page, page_index) | |
| if page_text and len(page_text.strip()) > 10: | |
| text_fragments.append(f"\n--- Page {page_index + 1} ---\n{page_text}") | |
| successful_pages += 1 | |
| except Exception as page_error: | |
| # Log page error but continue with other pages | |
| st.warning(f"Page {page_index + 1} extraction failed: {str(page_error)}") | |
| continue | |
| if successful_pages == 0: | |
| raise Exception("No pages yielded readable content") | |
| return '\n'.join(text_fragments) | |
| except ImportError: | |
| raise Exception("PyPDF2 library not available") | |
| except Exception as e: | |
| raise Exception(f"PyPDF2 extraction failed: {str(e)}") | |
| def _extract_page_text_robust(self, page, page_index: int) -> str: | |
| """ | |
| Robust page-level text extraction with multiple fallback methods. | |
| This method implements several text extraction approaches for individual | |
| pages, ensuring maximum content recovery from diverse PDF structures. | |
| """ | |
| # Primary extraction method | |
| try: | |
| text = page.extract_text() | |
| if text and len(text.strip()) > 10: | |
| return text | |
| except Exception: | |
| pass | |
| # Secondary extraction: access text objects directly | |
| try: | |
| if hasattr(page, 'get_contents') and page.get_contents(): | |
| content_stream = page.get_contents() | |
| if hasattr(content_stream, 'get_data'): | |
| stream_data = content_stream.get_data() | |
| decoded_stream = stream_data.decode('latin-1', errors='ignore') | |
| # Extract text from stream using safe pattern matching | |
| text = self._extract_from_content_stream(decoded_stream) | |
| if text and len(text.strip()) > 10: | |
| return text | |
| except Exception: | |
| pass | |
| # Tertiary extraction: character mapping approach | |
| try: | |
| return self._extract_via_character_mapping(page) | |
| except Exception: | |
| pass | |
| return "" | |
| def _extract_byte_level(self, pdf_file) -> str: | |
| """ | |
| Byte-level PDF analysis for extracting text from structurally complex files. | |
| This method performs low-level byte analysis to identify and extract | |
| text content from PDFs that resist standard parsing methods. | |
| """ | |
| pdf_data = self._read_pdf_data(pdf_file) | |
| # Multi-encoding text extraction strategy | |
| text_candidates = [] | |
| # Strategy 1: Latin-1 decoding with pattern extraction | |
| try: | |
| decoded_content = pdf_data.decode('latin-1', errors='ignore') | |
| latin_text = self._extract_text_patterns(decoded_content) | |
| if latin_text: | |
| text_candidates.append(('latin-1', latin_text)) | |
| except Exception: | |
| pass | |
| # Strategy 2: UTF-8 decoding with lenient error handling | |
| try: | |
| decoded_content = pdf_data.decode('utf-8', errors='ignore') | |
| utf8_text = self._extract_text_patterns(decoded_content) | |
| if utf8_text: | |
| text_candidates.append(('utf-8', utf8_text)) | |
| except Exception: | |
| pass | |
| # Strategy 3: Windows-1252 encoding (common in office documents) | |
| try: | |
| decoded_content = pdf_data.decode('cp1252', errors='ignore') | |
| cp1252_text = self._extract_text_patterns(decoded_content) | |
| if cp1252_text: | |
| text_candidates.append(('cp1252', cp1252_text)) | |
| except Exception: | |
| pass | |
| # Select best candidate based on content quality metrics | |
| if text_candidates: | |
| best_candidate = max( | |
| text_candidates, | |
| key=lambda x: self._calculate_text_quality_score(x[1]) | |
| ) | |
| return best_candidate[1] | |
| raise Exception("Byte-level extraction found no readable content") | |
| def _extract_text_patterns(self, decoded_content: str) -> str: | |
| """ | |
| Extract text using safe pattern matching without complex regex. | |
| This method identifies text content using simple string operations, | |
| avoiding regex compilation issues while maintaining extraction effectiveness. | |
| """ | |
| text_fragments = [] | |
| # Extract content between parentheses (common PDF text marker) | |
| content_length = len(decoded_content) | |
| i = 0 | |
| while i < content_length - 1: | |
| if decoded_content[i] == '(': | |
| # Found potential text start | |
| j = i + 1 | |
| parenthesis_depth = 1 | |
| extracted_fragment = "" | |
| # Extract until matching closing parenthesis | |
| while j < content_length and parenthesis_depth > 0: | |
| char = decoded_content[j] | |
| if char == '(': | |
| parenthesis_depth += 1 | |
| elif char == ')': | |
| parenthesis_depth -= 1 | |
| if parenthesis_depth > 0: | |
| # Handle escape sequences | |
| if char == '\\' and j + 1 < content_length: | |
| next_char = decoded_content[j + 1] | |
| if next_char in 'ntr\\()': | |
| escape_map = {'n': '\n', 't': '\t', 'r': '\r', '\\': '\\', '(': '(', ')': ')'} | |
| extracted_fragment += escape_map.get(next_char, next_char) | |
| j += 2 | |
| else: | |
| extracted_fragment += next_char | |
| j += 2 | |
| else: | |
| extracted_fragment += char | |
| j += 1 | |
| else: | |
| j += 1 | |
| # Process extracted fragment | |
| cleaned_fragment = self._clean_text_fragment(extracted_fragment) | |
| if self._is_meaningful_text(cleaned_fragment): | |
| text_fragments.append(cleaned_fragment) | |
| i = j | |
| else: | |
| i += 1 | |
| return ' '.join(text_fragments) if text_fragments else "" | |
| def _extract_pattern_based(self, pdf_file) -> str: | |
| """ | |
| Pattern-based extraction for identifying text in various PDF structures. | |
| This method uses content structure analysis to locate and extract | |
| text from PDFs with non-standard formatting or encoding. | |
| """ | |
| pdf_data = self._read_pdf_data(pdf_file) | |
| decoded_content = pdf_data.decode('latin-1', errors='ignore') | |
| # Define text extraction patterns (using simple string operations) | |
| extraction_patterns = [ | |
| self._extract_bt_et_blocks, # Text objects between BT/ET markers | |
| self._extract_tj_operations, # Text show operations | |
| self._extract_font_encoded_text, # Font-encoded text content | |
| self._extract_stream_objects # Direct stream object analysis | |
| ] | |
| best_extraction = "" | |
| best_quality_score = 0 | |
| for pattern_extractor in extraction_patterns: | |
| try: | |
| extracted_text = pattern_extractor(decoded_content) | |
| quality_score = self._calculate_text_quality_score(extracted_text) | |
| if quality_score > best_quality_score: | |
| best_extraction = extracted_text | |
| best_quality_score = quality_score | |
| except Exception as pattern_error: | |
| st.warning(f"Pattern extraction method failed: {str(pattern_error)}") | |
| continue | |
| if best_quality_score > 0.3: # Minimum quality threshold | |
| return best_extraction | |
| raise Exception("Pattern-based extraction found no high-quality content") | |
| def _extract_bt_et_blocks(self, content: str) -> str: | |
| """Extract text from BT/ET (Begin Text/End Text) blocks.""" | |
| text_blocks = [] | |
| # Find BT/ET pairs using simple string searching | |
| bt_positions = [] | |
| et_positions = [] | |
| search_pos = 0 | |
| while True: | |
| bt_pos = content.find('BT\n', search_pos) | |
| if bt_pos == -1: | |
| bt_pos = content.find('BT ', search_pos) | |
| if bt_pos == -1: | |
| break | |
| bt_positions.append(bt_pos) | |
| search_pos = bt_pos + 1 | |
| search_pos = 0 | |
| while True: | |
| et_pos = content.find('ET\n', search_pos) | |
| if et_pos == -1: | |
| et_pos = content.find('ET ', search_pos) | |
| if et_pos == -1: | |
| break | |
| et_positions.append(et_pos) | |
| search_pos = et_pos + 1 | |
| # Match BT/ET pairs and extract content | |
| for bt_pos in bt_positions: | |
| # Find corresponding ET | |
| matching_et = None | |
| for et_pos in et_positions: | |
| if et_pos > bt_pos: | |
| matching_et = et_pos | |
| break | |
| if matching_et: | |
| block_content = content[bt_pos:matching_et] | |
| block_text = self._extract_text_from_block(block_content) | |
| if block_text: | |
| text_blocks.append(block_text) | |
| return ' '.join(text_blocks) | |
| def _extract_manual_streams(self, pdf_file) -> str: | |
| """ | |
| Manual PDF stream parsing for maximum compatibility. | |
| This method implements a custom PDF parser that handles edge cases | |
| and structural variations that standard libraries might miss. | |
| """ | |
| pdf_data = self._read_pdf_data(pdf_file) | |
| # Identify and extract content streams | |
| stream_markers = [b'stream\n', b'stream\r\n', b'stream\r'] | |
| endstream_markers = [b'endstream', b'\nendstream', b'\rendstream'] | |
| extracted_streams = [] | |
| for stream_marker in stream_markers: | |
| start_pos = 0 | |
| while True: | |
| stream_start = pdf_data.find(stream_marker, start_pos) | |
| if stream_start == -1: | |
| break | |
| # Find corresponding endstream | |
| content_start = stream_start + len(stream_marker) | |
| stream_end = pdf_data.find(b'endstream', content_start) | |
| if stream_end != -1: | |
| stream_content = pdf_data[content_start:stream_end] | |
| # Attempt to decompress if needed | |
| decompressed_content = self._attempt_decompression(stream_content) | |
| # Extract text from stream | |
| stream_text = self._extract_text_from_stream(decompressed_content) | |
| if stream_text: | |
| extracted_streams.append(stream_text) | |
| start_pos = stream_end + 1 if stream_end != -1 else stream_start + 1 | |
| combined_text = ' '.join(extracted_streams) | |
| if len(combined_text.strip()) > 50: | |
| return combined_text | |
| raise Exception("Manual stream parsing found insufficient content") | |
| def _attempt_decompression(self, stream_content: bytes) -> bytes: | |
| """Attempt to decompress PDF stream content if compressed.""" | |
| try: | |
| import zlib | |
| return zlib.decompress(stream_content) | |
| except: | |
| try: | |
| import gzip | |
| return gzip.decompress(stream_content) | |
| except: | |
| return stream_content # Return as-is if decompression fails | |
| def _extract_text_from_stream(self, stream_content: bytes) -> str: | |
| """Extract text content from decompressed PDF stream.""" | |
| try: | |
| decoded_stream = stream_content.decode('latin-1', errors='ignore') | |
| return self._extract_text_patterns(decoded_stream) | |
| except: | |
| return "" | |
| # Utility methods for robust extraction | |
| def _read_pdf_data(self, pdf_file) -> bytes: | |
| """Safely read PDF data from various input types.""" | |
| if hasattr(pdf_file, 'read'): | |
| pdf_file.seek(0) | |
| data = pdf_file.read() | |
| pdf_file.seek(0) | |
| return data | |
| else: | |
| with open(pdf_file, 'rb') as f: | |
| return f.read() | |
| def _reset_file_pointer(self, pdf_file) -> None: | |
| """Reset file pointer if the file object supports it.""" | |
| if hasattr(pdf_file, 'seek'): | |
| pdf_file.seek(0) | |
| def _clean_text_fragment(self, fragment: str) -> str: | |
| """Clean individual text fragments for better readability.""" | |
| if not fragment: | |
| return "" | |
| # Remove non-printable characters | |
| printable_chars = [] | |
| for char in fragment: | |
| if 32 <= ord(char) <= 126 or char in '\n\r\t': | |
| printable_chars.append(char) | |
| elif ord(char) > 126: # Allow extended characters | |
| printable_chars.append(char) | |
| else: | |
| printable_chars.append(' ') | |
| cleaned = ''.join(printable_chars) | |
| # Normalize whitespace | |
| words = cleaned.split() | |
| return ' '.join(words) if words else "" | |
| def _is_meaningful_text(self, text: str) -> bool: | |
| """Determine if extracted text contains meaningful content.""" | |
| if not text or len(text.strip()) < 3: | |
| return False | |
| # Check for reasonable character distribution | |
| alphanumeric_count = sum(1 for c in text if c.isalnum()) | |
| total_chars = len(text.replace(' ', '')) | |
| if total_chars == 0: | |
| return False | |
| alphanumeric_ratio = alphanumeric_count / total_chars | |
| return alphanumeric_ratio > 0.3 # At least 30% alphanumeric | |
| def _calculate_text_quality_score(self, text: str) -> float: | |
| """Calculate quality score for extracted text.""" | |
| if not text: | |
| return 0.0 | |
| # Factors contributing to quality score | |
| length_score = min(len(text) / 1000, 1.0) # Longer text generally better | |
| word_count = len(text.split()) | |
| word_score = min(word_count / 100, 1.0) # More words generally better | |
| # Check for common HR terms (bonus points) | |
| hr_terms = ['policy', 'employee', 'company', 'benefit', 'leave', 'work', 'staff'] | |
| hr_term_count = sum(1 for term in hr_terms if term.lower() in text.lower()) | |
| hr_bonus = min(hr_term_count * 0.1, 0.3) | |
| # Penalty for excessive repetition | |
| unique_words = len(set(text.lower().split())) | |
| repetition_penalty = max(0, (word_count - unique_words * 2) / word_count) if word_count > 0 else 0 | |
| quality_score = (length_score * 0.3 + word_score * 0.4 + hr_bonus) * (1 - repetition_penalty) | |
| return min(quality_score, 1.0) | |
| def _validate_extraction_quality(self, text: str) -> bool: | |
| """Validate that extracted text meets minimum quality standards.""" | |
| if not text or len(text.strip()) < 100: | |
| return False | |
| quality_score = self._calculate_text_quality_score(text) | |
| return quality_score > 0.3 | |
| def _post_process_extracted_text(self, text: str) -> str: | |
| """Post-process extracted text for optimal readability.""" | |
| if not text: | |
| return "" | |
| # Normalize line breaks and spacing | |
| lines = text.split('\n') | |
| processed_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| if line and not line.startswith('---'): # Remove page markers | |
| processed_lines.append(line) | |
| # Join lines with appropriate spacing | |
| result = '\n'.join(processed_lines) | |
| # Final cleanup | |
| while '\n\n\n' in result: | |
| result = result.replace('\n\n\n', '\n\n') | |
| return result.strip() | |
| def _execute_with_timeout(self, func, *args, timeout_seconds: int = 30): | |
| """Execute function with timeout protection.""" | |
| # Simplified timeout implementation for basic protection | |
| start_time = time.time() | |
| try: | |
| result = func(*args) | |
| elapsed = time.time() - start_time | |
| if elapsed > timeout_seconds: | |
| st.warning(f"Operation took {elapsed:.1f}s (longer than expected)") | |
| return result | |
| except Exception as e: | |
| elapsed = time.time() - start_time | |
| if elapsed > timeout_seconds: | |
| raise Exception(f"Operation timed out after {elapsed:.1f}s") | |
| raise e | |
| def _record_success(self, method: str): | |
| """Record successful extraction for analytics.""" | |
| self.extraction_stats['successes'] += 1 | |
| if method not in self.extraction_stats['method_effectiveness']: | |
| self.extraction_stats['method_effectiveness'][method] = {'success': 0, 'total': 0} | |
| self.extraction_stats['method_effectiveness'][method]['success'] += 1 | |
| self.extraction_stats['method_effectiveness'][method]['total'] += 1 | |
| def _record_failure(self, method: str, error: str): | |
| """Record failed extraction for analytics.""" | |
| if method not in self.extraction_stats['method_effectiveness']: | |
| self.extraction_stats['method_effectiveness'][method] = {'success': 0, 'total': 0} | |
| self.extraction_stats['method_effectiveness'][method]['total'] += 1 | |
| def _handle_extraction_failure(self, pdf_file): | |
| """Provide comprehensive guidance when all extraction methods fail.""" | |
| st.error("β All extraction methods failed. Comprehensive PDF analysis:") | |
| # Analyze PDF structure for specific guidance | |
| analysis_results = self._analyze_pdf_structure(pdf_file) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("**π PDF Analysis Results:**") | |
| for key, value in analysis_results.items(): | |
| st.write(f"β’ **{key}:** {value}") | |
| with col2: | |
| st.markdown("**π οΈ Recommended Solutions:**") | |
| solutions = self._generate_specific_solutions(analysis_results) | |
| for solution in solutions: | |
| st.write(f"β’ {solution}") | |
| # Provide manual input option as last resort | |
| self._offer_manual_input_option() | |
| def _analyze_pdf_structure(self, pdf_file) -> Dict[str, str]: | |
| """Analyze PDF structure to provide specific guidance.""" | |
| analysis = {} | |
| try: | |
| pdf_data = self._read_pdf_data(pdf_file) | |
| # Basic file analysis | |
| analysis['File Size'] = f"{len(pdf_data) / 1024:.1f} KB" | |
| analysis['PDF Version'] = self._detect_pdf_version(pdf_data) | |
| analysis['Encryption'] = 'Yes' if b'/Encrypt' in pdf_data else 'No' | |
| analysis['Images Present'] = 'Yes' if b'/Image' in pdf_data else 'No' | |
| analysis['Fonts Present'] = 'Yes' if b'/Font' in pdf_data else 'No' | |
| analysis['Text Objects'] = str(pdf_data.count(b'BT')) | |
| # Content type detection | |
| if pdf_data.count(b'BT') == 0 and b'/Image' in pdf_data: | |
| analysis['Content Type'] = 'Likely scanned/image-based' | |
| elif pdf_data.count(b'BT') > 0: | |
| analysis['Content Type'] = 'Text-based' | |
| else: | |
| analysis['Content Type'] = 'Unknown/Complex' | |
| except Exception as e: | |
| analysis['Analysis Error'] = str(e) | |
| return analysis | |
| def _detect_pdf_version(self, pdf_data: bytes) -> str: | |
| """Detect PDF version from header.""" | |
| try: | |
| header = pdf_data[:20].decode('ascii', errors='ignore') | |
| if '%PDF-' in header: | |
| version_start = header.find('%PDF-') + 5 | |
| version = header[version_start:version_start + 3] | |
| return version | |
| except: | |
| pass | |
| return 'Unknown' | |
| def _generate_specific_solutions(self, analysis: Dict[str, str]) -> List[str]: | |
| """Generate specific solutions based on PDF analysis.""" | |
| solutions = [] | |
| content_type = analysis.get('Content Type', '') | |
| encryption = analysis.get('Encryption', '') | |
| if 'scanned' in content_type.lower() or 'image' in content_type.lower(): | |
| solutions.extend([ | |
| "PDF appears to be scanned - use OCR software to convert to text", | |
| "Try Adobe Acrobat's 'Recognize Text' feature", | |
| "Consider re-creating document from original source" | |
| ]) | |
| if encryption == 'Yes': | |
| solutions.append("Remove password protection before uploading") | |
| if analysis.get('Text Objects', '0') == '0': | |
| solutions.extend([ | |
| "No text objects found - likely image-based content", | |
| "Export from original application (Word, Google Docs) as PDF" | |
| ]) | |
| # Universal solutions | |
| solutions.extend([ | |
| "Try 'Print to PDF' from any PDF viewer", | |
| "Use online PDF converter to optimize format", | |
| "Contact IT support for complex document conversion" | |
| ]) | |
| return solutions | |
| def _offer_manual_input_option(self): | |
| """Offer manual text input as last resort.""" | |
| with st.expander("ποΈ Manual Text Input (Last Resort)", expanded=False): | |
| st.markdown(""" | |
| If automatic extraction fails, you can manually input key policy content: | |
| """) | |
| manual_text = st.text_area( | |
| "Paste policy text here:", | |
| height=200, | |
| placeholder="Copy and paste the key content from your PDF here..." | |
| ) | |
| if st.button("π Process Manual Input") and manual_text: | |
| if len(manual_text.strip()) > 100: | |
| st.success("β Manual input received! Processing...") | |
| return manual_text.strip() | |
| else: | |
| st.warning("Please provide more substantial content (at least 100 characters)") | |
| return None | |
| # Required interface methods for compatibility | |
| def create_intelligent_chunks(self, text: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Create optimized text chunks for vector storage.""" | |
| if not text or len(text.strip()) < 50: | |
| return [] | |
| chunks = [] | |
| chunk_size = self.config.CHUNK_SIZE | |
| overlap = self.config.CHUNK_OVERLAP | |
| # Intelligent sentence-based chunking | |
| sentences = self._split_into_sentences_robust(text) | |
| current_chunk = "" | |
| chunk_index = 0 | |
| for sentence in sentences: | |
| potential_chunk = f"{current_chunk} {sentence}".strip() if current_chunk else sentence | |
| if len(potential_chunk) <= chunk_size: | |
| current_chunk = potential_chunk | |
| else: | |
| # Save current chunk if meaningful | |
| if current_chunk and len(current_chunk.strip()) >= 100: | |
| chunks.append({ | |
| 'content': current_chunk.strip(), | |
| 'metadata': { | |
| **metadata, | |
| 'chunk_type': 'intelligent_semantic', | |
| 'chunk_index': chunk_index, | |
| 'extraction_method': 'bulletproof_processor' | |
| } | |
| }) | |
| chunk_index += 1 | |
| # Start new chunk with smart overlap | |
| if overlap > 0 and current_chunk: | |
| words = current_chunk.split() | |
| overlap_words = words[-overlap:] if len(words) > overlap else words | |
| current_chunk = " ".join(overlap_words) + " " + sentence | |
| else: | |
| current_chunk = sentence | |
| # Process final chunk | |
| if current_chunk and len(current_chunk.strip()) >= 100: | |
| chunks.append({ | |
| 'content': current_chunk.strip(), | |
| 'metadata': { | |
| **metadata, | |
| 'chunk_type': 'intelligent_semantic', | |
| 'chunk_index': chunk_index, | |
| 'extraction_method': 'bulletproof_processor' | |
| } | |
| }) | |
| return chunks | |
| def _split_into_sentences_robust(self, text: str) -> List[str]: | |
| """Robust sentence splitting optimized for HR documents.""" | |
| sentences = [] | |
| current_sentence = "" | |
| # Enhanced sentence boundary detection | |
| sentence_endings = '.!?' | |
| abbreviations = {'Mr.', 'Mrs.', 'Dr.', 'Inc.', 'Corp.', 'Ltd.', 'Co.', 'etc.', 'vs.'} | |
| i = 0 | |
| while i < len(text): | |
| char = text[i] | |
| current_sentence += char | |
| if char in sentence_endings: | |
| # Check if this is a real sentence ending | |
| is_sentence_end = True | |
| # Check for abbreviations | |
| words_before = current_sentence.strip().split() | |
| if words_before: | |
| last_word = words_before[-1] | |
| if last_word in abbreviations: | |
| is_sentence_end = False | |
| # Check if followed by lowercase (likely abbreviation) | |
| if i + 1 < len(text) and text[i + 1].islower(): | |
| is_sentence_end = False | |
| if is_sentence_end and len(current_sentence.strip()) > 10: | |
| sentences.append(current_sentence.strip()) | |
| current_sentence = "" | |
| elif char == '\n' and current_sentence.strip(): | |
| # Force sentence break on newlines | |
| sentences.append(current_sentence.strip()) | |
| current_sentence = "" | |
| i += 1 | |
| # Add final sentence | |
| if current_sentence.strip() and len(current_sentence.strip()) > 10: | |
| sentences.append(current_sentence.strip()) | |
| return sentences | |
| def generate_embeddings(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Generate embeddings with robust error handling.""" | |
| if not chunks or not self.embedding_model: | |
| st.warning("β οΈ Embedding generation unavailable. Documents will be stored without embeddings.") | |
| return chunks | |
| enhanced_chunks = [] | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| for i, chunk in enumerate(chunks): | |
| try: | |
| progress = (i + 1) / len(chunks) | |
| progress_bar.progress(progress) | |
| status_text.text(f"Generating embeddings... {i + 1}/{len(chunks)}") | |
| # Generate embedding with error handling | |
| embedding = self.embedding_model.encode( | |
| chunk['content'], | |
| normalize_embeddings=True, | |
| show_progress_bar=False | |
| ).tolist() | |
| enhanced_chunk = { | |
| **chunk, | |
| 'embedding': embedding, | |
| 'embedding_model': 'all-MiniLM-L6-v2', | |
| 'processed_at': time.time() | |
| } | |
| enhanced_chunks.append(enhanced_chunk) | |
| except Exception as e: | |
| st.warning(f"Embedding generation failed for chunk {i}: {str(e)}") | |
| # Add chunk without embedding | |
| enhanced_chunks.append({ | |
| **chunk, | |
| 'embedding': None, | |
| 'embedding_error': str(e), | |
| 'processed_at': time.time() | |
| }) | |
| progress_bar.empty() | |
| status_text.empty() | |
| return enhanced_chunks | |
| def calculate_document_hash(self, pdf_file) -> str: | |
| """Calculate document hash for deduplication.""" | |
| hasher = hashlib.sha256() | |
| pdf_data = self._read_pdf_data(pdf_file) | |
| hasher.update(pdf_data) | |
| return hasher.hexdigest() | |
| def process_document(self, pdf_file, filename: str) -> Optional[Dict[str, Any]]: | |
| """Complete document processing pipeline with comprehensive error handling.""" | |
| try: | |
| # Calculate document hash | |
| doc_hash = self.calculate_document_hash(pdf_file) | |
| # Extract text with bulletproof methods | |
| st.info(f"π Processing {filename} with bulletproof extraction...") | |
| text_content = self.extract_text_from_pdf(pdf_file) | |
| if not text_content: | |
| st.error("β Could not extract readable content from PDF") | |
| return None | |
| # Create comprehensive metadata | |
| metadata = { | |
| 'source': filename, | |
| 'document_hash': doc_hash, | |
| 'processed_at': time.time(), | |
| 'content_length': len(text_content), | |
| 'document_type': 'hr_policy', | |
| 'extraction_stats': self.extraction_stats, | |
| 'processor_version': 'bulletproof_v1.0' | |
| } | |
| # Create intelligent chunks | |
| st.info("π§© Creating intelligent text chunks...") | |
| chunks = self.create_intelligent_chunks(text_content, metadata) | |
| if not chunks: | |
| st.error("β Failed to create meaningful chunks from document") | |
| return None | |
| # Generate embeddings | |
| st.info("π§ Generating semantic embeddings...") | |
| enhanced_chunks = self.generate_embeddings(chunks) | |
| # Prepare final document package | |
| processed_doc = { | |
| 'filename': filename, | |
| 'document_hash': doc_hash, | |
| 'metadata': metadata, | |
| 'chunks': enhanced_chunks, | |
| 'chunk_count': len(enhanced_chunks), | |
| 'total_tokens': sum(len(chunk['content'].split()) for chunk in enhanced_chunks), | |
| 'processing_time': time.time() - metadata['processed_at'] | |
| } | |
| st.success(f"β Successfully processed {filename} into {len(enhanced_chunks)} chunks") | |
| return processed_doc | |
| except Exception as e: | |
| st.error(f"β Document processing failed: {str(e)}") | |
| return None | |
| def validate_pdf_file(self, pdf_file) -> bool: | |
| """Comprehensive PDF validation with helpful feedback.""" | |
| try: | |
| # Basic file type validation | |
| if hasattr(pdf_file, 'type') and pdf_file.type != 'application/pdf': | |
| st.error("β Please upload a valid PDF file") | |
| return False | |
| # Size validation | |
| if hasattr(pdf_file, 'size'): | |
| if pdf_file.size > self.config.MAX_FILE_SIZE: | |
| size_mb = self.config.MAX_FILE_SIZE / (1024*1024) | |
| st.error(f"β File size exceeds {size_mb:.1f}MB limit") | |
| return False | |
| if pdf_file.size < 100: | |
| st.error("β File appears to be too small or corrupted") | |
| return False | |
| # PDF signature validation | |
| try: | |
| pdf_data = self._read_pdf_data(pdf_file) | |
| if not pdf_data.startswith(b'%PDF'): | |
| st.error("β Invalid PDF file format") | |
| return False | |
| st.success("β PDF file validation passed") | |
| return True | |
| except Exception as validation_error: | |
| st.warning(f"β οΈ PDF validation warning: {str(validation_error)}") | |
| return True # Allow processing to continue | |
| except Exception as e: | |
| st.error(f"β File validation failed: {str(e)}") | |
| return False | |
| # Replace the previous DocumentProcessor with our bulletproof version | |
| DocumentProcessor = BulletproofDocumentProcessor |