File size: 4,244 Bytes
cfeb3a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from pathlib import Path
from PIL import Image
import PyPDF2
from config.settings import settings
from typing import Dict
import tempfile
import os

class FileHandler:
    def __init__(self):
        self.temp_dir = Path(settings.TEMP_DIR)
        self.max_size_mb = settings.MAX_FILE_SIZE_MB

    def validate_file(self, uploaded_file) -> Dict:
        validation = {"valid": False, "error": None, "file_info": None}
        if not uploaded_file:
            validation["error"] = "No file"
            return validation
        file_size_mb = len(uploaded_file.getbuffer()) / (1024 * 1024)
        if file_size_mb > self.max_size_mb:
            validation["error"] = "File too large"
            return validation
        file_extension = uploaded_file.name.split('.')[-1].lower()
        if file_extension not in settings.SUPPORTED_FILE_TYPES:
            validation["error"] = "Unsupported type"
            return validation
        validation["valid"] = True
        # Extract just filename for display (uploaded_file.name contains full Gradio temp path)
        import os
        filename = os.path.basename(uploaded_file.name)
        validation["file_info"] = {"name": filename, "size_mb": file_size_mb, "type": file_extension}
        return validation

    def save_uploaded_file(self, uploaded_file, session_id: str) -> str:
        # Handle None session_id gracefully
        if not session_id:
            import uuid
            session_id = str(uuid.uuid4())[:8]
        
        # Create session directory in temp
        session_dir = self.temp_dir / session_id / "input"
        session_dir.mkdir(parents=True, exist_ok=True)
        
        # Extract just the filename from the full path (uploaded_file.name contains full Gradio temp path)
        import os
        import logging
        logger = logging.getLogger(__name__)
        
        filename = os.path.basename(uploaded_file.name)
        file_path = session_dir / filename
        
        logger.info(f"Moving file from Gradio temp: {uploaded_file.name}")
        logger.info(f"To session directory: {file_path}")
        
        with open(file_path, "wb") as f:
            # Handle different types of file upload objects
            if hasattr(uploaded_file, 'getbuffer'):
                f.write(uploaded_file.getbuffer())
            elif hasattr(uploaded_file, 'read'):
                f.write(uploaded_file.read())
            else:
                # For NamedString or similar objects, read from the file path
                with open(uploaded_file.name, 'rb') as src:  # Use uploaded_file.name (Gradio temp path) to read
                    f.write(src.read())
        return str(file_path)

    def get_file_preview(self, file_path: str, file_type: str) -> str:
        if file_type == 'pdf':
            try:
                with open(file_path, 'rb') as file:
                    reader = PyPDF2.PdfReader(file)
                    if len(reader.pages) > 0:
                        text = reader.pages[0].extract_text()
                        return text[:500] + "..." if len(text) > 500 else text
            except Exception:
                return "PDF preview not available"
        elif file_type == 'txt':
            try:
                with open(file_path, 'r', encoding='utf-8') as file:
                    text = file.read()
                    return text[:500] + "..." if len(text) > 500 else text
            except Exception:
                return "Text preview not available"
        # Similar for image types could be added
        return "Preview not available"

    def cleanup_temp_files(self):
        """Clean up old temporary files."""
        try:
            import time
            current_time = time.time()
            # Clean up sessions older than 24 hours
            for session_dir in self.temp_dir.iterdir():
                if session_dir.is_dir():
                    # Check if directory is older than 24 hours
                    dir_age = current_time - session_dir.stat().st_mtime
                    if dir_age > 24 * 3600:  # 24 hours in seconds
                        import shutil
                        shutil.rmtree(session_dir)
        except Exception:
            pass  # Ignore cleanup errors