| | """ |
| | File Upload Handler |
| | Supports: .txt, .csv, .md, .log, .text |
| | Max size: 1GB |
| | Auto encoding detection |
| | """ |
| |
|
| | import os |
| | import chardet |
| | from typing import Tuple |
| |
|
| | |
| | MAX_FILE_SIZE = 1 * 1024 * 1024 * 1024 |
| |
|
| | |
| | MAX_CHARACTERS = 50_000_000 |
| |
|
| | |
| | SUPPORTED_EXTENSIONS = {'.txt', '.csv', '.md', '.text', '.log', '.srt', '.sub'} |
| |
|
| |
|
| | class FileHandler: |
| | """Handle file uploads and text extraction""" |
| | |
| | @staticmethod |
| | def validate_file(filepath: str) -> Tuple[bool, str]: |
| | """Validate uploaded file""" |
| | |
| | if not filepath or not os.path.exists(filepath): |
| | return False, "β File not found!" |
| | |
| | |
| | _, ext = os.path.splitext(filepath) |
| | ext = ext.lower() |
| | |
| | if ext not in SUPPORTED_EXTENSIONS: |
| | supported = ', '.join(SUPPORTED_EXTENSIONS) |
| | return False, f"β Unsupported file type: {ext}\nSupported: {supported}" |
| | |
| | |
| | file_size = os.path.getsize(filepath) |
| | |
| | if file_size == 0: |
| | return False, "β File is empty!" |
| | |
| | if file_size > MAX_FILE_SIZE: |
| | size_gb = file_size / (1024 ** 3) |
| | return False, f"β File too large: {size_gb:.2f}GB (Max: 1GB)" |
| | |
| | return True, "β
File valid" |
| | |
| | @staticmethod |
| | def detect_encoding(filepath: str) -> str: |
| | """Detect file encoding""" |
| | try: |
| | with open(filepath, 'rb') as f: |
| | |
| | raw = f.read(102400) |
| | |
| | result = chardet.detect(raw) |
| | encoding = result.get('encoding', 'utf-8') |
| | confidence = result.get('confidence', 0) |
| | |
| | |
| | if not encoding or confidence < 0.5: |
| | encoding = 'utf-8' |
| | |
| | return encoding |
| | |
| | except Exception: |
| | return 'utf-8' |
| | |
| | @staticmethod |
| | def read_file(filepath: str) -> Tuple[str, str]: |
| | """ |
| | Read text from file |
| | Returns: (text_content, status_message) |
| | """ |
| | |
| | |
| | is_valid, msg = FileHandler.validate_file(filepath) |
| | if not is_valid: |
| | return "", msg |
| | |
| | file_size = os.path.getsize(filepath) |
| | size_mb = file_size / (1024 * 1024) |
| | |
| | try: |
| | |
| | encoding = FileHandler.detect_encoding(filepath) |
| | |
| | |
| | with open(filepath, 'r', encoding=encoding, errors='ignore') as f: |
| | text = f.read() |
| | |
| | |
| | char_count = len(text) |
| | |
| | |
| | trimmed = False |
| | if char_count > MAX_CHARACTERS: |
| | text = text[:MAX_CHARACTERS] |
| | trimmed = True |
| | char_count = MAX_CHARACTERS |
| | |
| | |
| | text = FileHandler.clean_text(text) |
| | |
| | |
| | if char_count >= 1_000_000: |
| | char_display = f"{char_count/1_000_000:.1f}M" |
| | elif char_count >= 1_000: |
| | char_display = f"{char_count/1_000:.1f}K" |
| | else: |
| | char_display = str(char_count) |
| | |
| | status = f"β
Loaded: {size_mb:.1f}MB | {char_display} characters | Encoding: {encoding}" |
| | |
| | if trimmed: |
| | status += f" | β οΈ Trimmed to 50M characters" |
| | |
| | return text, status |
| | |
| | except UnicodeDecodeError: |
| | |
| | try: |
| | with open(filepath, 'rb') as f: |
| | raw = f.read() |
| | text = raw.decode('utf-8', errors='ignore') |
| | return text, f"β
Loaded (fallback encoding): {size_mb:.1f}MB" |
| | except Exception as e: |
| | return "", f"β Cannot read file: {str(e)}" |
| | |
| | except MemoryError: |
| | return "", "β File too large for memory! Try a smaller file." |
| | |
| | except Exception as e: |
| | return "", f"β Error reading file: {str(e)}" |
| | |
| | @staticmethod |
| | def clean_text(text: str) -> str: |
| | """Basic text cleaning""" |
| | if not text: |
| | return "" |
| | |
| | |
| | text = text.replace('\x00', '') |
| | |
| | |
| | text = text.replace('\r\n', '\n') |
| | text = text.replace('\r', '\n') |
| | |
| | |
| | lines = text.split('\n') |
| | cleaned_lines = [] |
| | empty_count = 0 |
| | |
| | for line in lines: |
| | stripped = line.strip() |
| | if not stripped: |
| | empty_count += 1 |
| | if empty_count <= 2: |
| | cleaned_lines.append('') |
| | else: |
| | empty_count = 0 |
| | cleaned_lines.append(stripped) |
| | |
| | return '\n'.join(cleaned_lines).strip() |
| | |
| | @staticmethod |
| | def get_file_info(filepath: str) -> dict: |
| | """Get file information""" |
| | if not filepath or not os.path.exists(filepath): |
| | return {"error": "File not found"} |
| | |
| | file_size = os.path.getsize(filepath) |
| | _, ext = os.path.splitext(filepath) |
| | encoding = FileHandler.detect_encoding(filepath) |
| | |
| | return { |
| | "name": os.path.basename(filepath), |
| | "size_bytes": file_size, |
| | "size_mb": file_size / (1024 * 1024), |
| | "extension": ext, |
| | "encoding": encoding |
| | } |
| |
|
| |
|
| | def process_uploaded_file(file) -> str: |
| | """ |
| | Gradio-compatible file processor |
| | Called directly from UI |
| | """ |
| | if file is None: |
| | return "" |
| | |
| | filepath = file.name if hasattr(file, 'name') else str(file) |
| | |
| | text, status = FileHandler.read_file(filepath) |
| | |
| | if not text: |
| | return status |
| | |
| | print(f"π File loaded: {status}") |
| | return text |