Spaces:
Build error
Build error
| import chardet | |
| import pypdf | |
| import docx | |
| from pdf2image import convert_from_bytes | |
| import pytesseract | |
| from PIL import Image | |
| from typing import Tuple, List, Dict, Optional | |
| import json | |
| import os | |
| import re | |
| from datetime import datetime | |
| import spacy | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| from nltk.corpus import stopwords | |
| from pathlib import Path | |
| import streamlit as st | |
| import shutil | |
| import poppler | |
| class DocumentProcessor: | |
| def __init__(self, base_path: str = None): | |
| """Initialize Document Processor with proper data directory handling.""" | |
| self.base_path = self._setup_data_directories(base_path) | |
| self.ontology_path = os.path.join(self.base_path, "ontology", "legal_ontology.json") | |
| # Initialize NLP components | |
| self._initialize_nlp() | |
| # Ensure ontology exists | |
| self._ensure_ontology_exists() | |
| # Load ontology | |
| self.ontology = self._load_ontology() | |
| # Create processing directories | |
| self.processed_path = os.path.join(self.base_path, "processed") | |
| self.temp_path = os.path.join(self.base_path, "temp") | |
| os.makedirs(self.processed_path, exist_ok=True) | |
| os.makedirs(self.temp_path, exist_ok=True) | |
| def _setup_data_directories(self, base_path: Optional[str] = None) -> str: | |
| """Set up data directories with error handling.""" | |
| data_path = base_path or os.path.join(os.getcwd(), "data") | |
| subdirs = ["ontology", "processed", "temp", "indexes"] | |
| for subdir in subdirs: | |
| os.makedirs(os.path.join(data_path, subdir), exist_ok=True) | |
| return data_path | |
| def _initialize_nlp(self): | |
| """Initialize NLP components.""" | |
| try: | |
| # Load spaCy model | |
| try: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| except OSError: | |
| st.info("Downloading spaCy model...") | |
| os.system("python -m spacy download en_core_web_sm") | |
| self.nlp = spacy.load("en_core_web_sm") | |
| # Initialize NLTK | |
| nltk_data_dir = os.path.join(self.base_path, "nltk_data") | |
| os.makedirs(nltk_data_dir, exist_ok=True) | |
| nltk.data.path.append(nltk_data_dir) | |
| required_resources = ['punkt', 'averaged_perceptron_tagger', 'maxent_ne_chunker', 'words', 'stopwords'] | |
| for resource in required_resources: | |
| try: | |
| nltk.download(resource, download_dir=nltk_data_dir, quiet=True) | |
| except Exception as e: | |
| st.warning(f"Could not download {resource}: {str(e)}") | |
| self.stop_words = set(nltk.corpus.stopwords.words('english')) | |
| except Exception as e: | |
| st.error(f"Error initializing NLP components: {str(e)}") | |
| raise | |
| def _ensure_ontology_exists(self): | |
| """Ensure the legal ontology file exists, create if not.""" | |
| if not os.path.exists(self.ontology_path): | |
| default_ontology = { | |
| "@graph": [ | |
| { | |
| "@id": "concept:Contract", | |
| "@type": "vocab:LegalConcept", | |
| "rdfs:label": "Contract", | |
| "rdfs:comment": "A legally binding agreement between parties", | |
| "vocab:relatedConcepts": ["Offer", "Acceptance", "Consideration"] | |
| }, | |
| { | |
| "@id": "concept:Judgment", | |
| "@type": "vocab:LegalConcept", | |
| "rdfs:label": "Judgment", | |
| "rdfs:comment": "A court's final determination", | |
| "vocab:relatedConcepts": ["Court Order", "Decision", "Ruling"] | |
| } | |
| ] | |
| } | |
| with open(self.ontology_path, 'w') as f: | |
| json.dump(default_ontology, f, indent=2) | |
| def _load_ontology(self) -> Dict: | |
| """Load legal ontology with error handling.""" | |
| try: | |
| with open(self.ontology_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| st.error(f"Error loading ontology: {str(e)}") | |
| return {"@graph": []} | |
| def process_and_tag_document(self, file) -> Tuple[str, List[Dict], Dict]: | |
| """Process document and generate metadata.""" | |
| try: | |
| doc_id = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| doc_dir = os.path.join(self.processed_path, doc_id) | |
| os.makedirs(doc_dir, exist_ok=True) | |
| original_path = os.path.join(doc_dir, "original" + Path(file.name).suffix) | |
| with open(original_path, 'wb') as f: | |
| f.write(file.getvalue()) | |
| # Extract text and process document | |
| text, chunks = self.process_document(original_path) | |
| metadata = self._extract_metadata(text, file.name) | |
| metadata.update({"doc_id": doc_id, "original_path": original_path}) | |
| # Save processed data | |
| self._save_processed_data(doc_dir, text, chunks, metadata) | |
| return text, chunks, metadata | |
| except Exception as e: | |
| st.error(f"Error in document processing pipeline: {str(e)}") | |
| raise | |
| def _tokenize_text(self, text: str) -> List[str]: | |
| """Tokenize text into sentences using NLTK.""" | |
| try: | |
| return sent_tokenize(text) | |
| except Exception: | |
| return [sentence.strip() for sentence in text.split('.') if sentence.strip()] | |
| def process_document(self, file_path: str) -> Tuple[str, List[Dict]]: | |
| """Process a document based on its type.""" | |
| file_type = Path(file_path).suffix.lower() | |
| if file_type == '.pdf': | |
| text = self._process_pdf(file_path) | |
| elif file_type == '.docx': | |
| text = self._process_docx(file_path) | |
| elif file_type in ['.txt', '.csv']: | |
| text = self._process_text(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_type}") | |
| chunks = self._create_chunks(text) | |
| return text, chunks | |
| def process_pdf(self, file_path: str) -> Optional[str]: | |
| try: | |
| # First verify if poppler is installed via package manager | |
| try: | |
| subprocess.check_output(['pdftoppm', '-v'], stderr=subprocess.STDOUT) | |
| st.success("✓ Poppler found on system") | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| # If not in default path, check common installation directories | |
| poppler_paths = [ | |
| "/usr/bin", | |
| "/usr/local/bin", | |
| "/opt/poppler/bin", | |
| "/app/.apt/usr/bin", # Common HF Spaces path | |
| os.path.expanduser("~/.local/bin") | |
| ] | |
| for poppler_dir in poppler_paths: | |
| if os.path.exists(os.path.join(poppler_dir, "pdftoppm")): | |
| # Update PATH | |
| os.environ["PATH"] = f"{poppler_dir}:{os.environ.get('PATH', '')}" | |
| st.success(f"✓ Found Poppler in {poppler_dir}") | |
| break | |
| else: | |
| st.error("❌ Poppler not found. Please ensure 'poppler-utils' is in packages.txt") | |
| return None | |
| # Attempt to read and convert the PDF | |
| try: | |
| with open(file_path, 'rb') as pdf_file: | |
| pdf_bytes = pdf_file.read() | |
| # Convert PDF to images | |
| images = convert_from_bytes( | |
| pdf_bytes, | |
| dpi=300, # Increase DPI for better OCR quality | |
| fmt='png' | |
| ) | |
| # Process each page | |
| text = "" | |
| total_pages = len(images) | |
| for page_num, image in enumerate(images, 1): | |
| st.progress(page_num / total_pages) | |
| st.info(f"📄 Processing page {page_num}/{total_pages}") | |
| # Perform OCR with custom configuration | |
| page_text = pytesseract.image_to_string( | |
| image, | |
| config='--psm 3 --oem 3' # Use default page segmentation and OCR Engine Mode | |
| ) | |
| text += f"\n{'='*20} Page {page_num} {'='*20}\n{page_text}\n" | |
| return text.strip() | |
| except Exception as e: | |
| st.error(f"Error processing PDF content: {str(e)}") | |
| return None | |
| except Exception as e: | |
| st.error(f"Unexpected error: {str(e)}") | |
| return None | |
| def _process_docx(self, file_path: str) -> str: | |
| """Extract text from DOCX files.""" | |
| try: | |
| doc = docx.Document(file_path) | |
| return "\n".join(para.text for para in doc.paragraphs if para.text.strip()) | |
| except Exception as e: | |
| st.error(f"Error processing DOCX: {str(e)}") | |
| raise | |
| def _process_text(self, file_path: str) -> str: | |
| """Process plain text files.""" | |
| try: | |
| with open(file_path, 'rb') as f: | |
| raw_data = f.read() | |
| encoding = chardet.detect(raw_data).get('encoding', 'utf-8') | |
| return raw_data.decode(encoding) | |
| except Exception as e: | |
| st.error(f"Error processing text file: {str(e)}") | |
| raise | |
| def _create_chunks(self, text: str) -> List[Dict]: | |
| """Chunk text for further processing.""" | |
| sentences = self._tokenize_text(text) | |
| chunk_size = 500 | |
| chunks = [] | |
| current_chunk, current_length = [], 0 | |
| for sentence in sentences: | |
| if current_length + len(sentence) > chunk_size and current_chunk: | |
| chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks))) | |
| current_chunk, current_length = [], 0 | |
| current_chunk.append(sentence) | |
| current_length += len(sentence) | |
| if current_chunk: | |
| chunks.append(self._process_chunk(' '.join(current_chunk), len(chunks))) | |
| return chunks | |
| def _link_to_ontology(self, text: str) -> List[Dict]: | |
| """Link text to legal ontology concepts.""" | |
| relevant_concepts = [] | |
| text_lower = text.lower() | |
| for concept in self.ontology.get("@graph", []): | |
| if "rdfs:label" not in concept: | |
| continue | |
| label = concept["rdfs:label"].lower() | |
| if label in text_lower: | |
| # Get surrounding context | |
| start_idx = text_lower.index(label) | |
| context_start = max(0, start_idx - 100) | |
| context_end = min(len(text), start_idx + len(label) + 100) | |
| relevant_concepts.append({ | |
| "concept": concept["rdfs:label"], | |
| "type": concept.get("@type", "Unknown"), | |
| "description": concept.get("rdfs:comment", ""), | |
| "context": text[context_start:context_end].strip(), | |
| "location": {"start": start_idx, "end": start_idx + len(label)} | |
| }) | |
| return relevant_concepts | |
| def _process_chunk(self, text: str, chunk_id: int) -> Dict: | |
| """Process individual chunks with NLP and ontology linking.""" | |
| doc = self.nlp(text) | |
| return { | |
| 'chunk_id': chunk_id, | |
| 'text': text, | |
| 'entities': [(ent.text, ent.label_) for ent in doc.ents], | |
| 'noun_phrases': [np.text for np in doc.noun_chunks], | |
| 'ontology_links': self._link_to_ontology(text) | |
| } | |
| def _extract_metadata(self, text: str, file_name: str) -> Dict: | |
| """Extract metadata from text.""" | |
| doc = self.nlp(text[:10000]) | |
| return { | |
| 'filename': file_name, | |
| 'file_type': Path(file_name).suffix.lower(), | |
| 'processed_at': datetime.now().isoformat(), | |
| 'entities': [(ent.text, ent.label_) for ent in doc.ents], | |
| 'document_type': 'Legal Document' | |
| } | |
| def _save_processed_data(self, doc_dir: str, text: str, chunks: List[Dict], metadata: Dict): | |
| """Save processed data to disk.""" | |
| with open(os.path.join(doc_dir, "processed.txt"), 'w', encoding='utf-8') as f: | |
| f.write(text) | |
| with open(os.path.join(doc_dir, "chunks.json"), 'w') as f: | |
| json.dump(chunks, f, indent=2) | |
| with open(os.path.join(doc_dir, "metadata.json"), 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| def cleanup(self): | |
| """Clean up temporary files.""" | |
| shutil.rmtree(self.temp_path, ignore_errors=True) | |
| os.makedirs(self.temp_path, exist_ok=True) | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup() | |