import aiohttp import asyncio import tempfile import os import re from urllib.parse import urlparse from typing import List, Tuple class FileDownloader: """Enhanced file downloader that supports multiple file types.""" async def download_file(self, url: str, timeout: int = 300, max_retries: int = 3) -> Tuple[str, str]: """Download any file type from a URL to a temporary file with enhanced error handling.""" print(f"📥 Downloading file from: {url[:60]}...") for attempt in range(max_retries): try: timeout_config = aiohttp.ClientTimeout( total=timeout, connect=30, sock_read=120 ) async with aiohttp.ClientSession(timeout=timeout_config) as session: print(f" Attempt {attempt + 1}/{max_retries} (timeout: {timeout}s)") async with session.get(url) as response: if response.status != 200: raise Exception(f"Failed to download file: HTTP {response.status}") # Extract filename from header or URL cd = response.headers.get('Content-Disposition', '') filename_match = re.findall('filename="?([^"]+)"?', cd) if filename_match: filename = filename_match[0] else: from urllib.parse import unquote path = urlparse(url).path filename = os.path.basename(unquote(path)) # Decode URL encoding if not filename: filename = "downloaded_file" ext = os.path.splitext(filename)[1] if not ext: return url, "url" print(f" 📁 Detected filename: {filename}, extension: {ext}") # Check if file type is supported supported_extensions = ['.pdf', '.docx', '.pptx', '.png', '.xlsx', '.jpeg', '.jpg', '.txt', '.csv'] if ext not in supported_extensions: # Return extension without dot for consistency ext_without_dot = ext[1:] if ext.startswith('.') else ext print(f" ❌ File type not supported: {ext}") return 'not supported', ext_without_dot # Get content length content_length = response.headers.get('content-length') if content_length: total_size = int(content_length) print(f" File size: {total_size / (1024 * 1024):.1f} MB") # Create temp file with same extension temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext, prefix="download_") # Write to file downloaded = 0 async for chunk in response.content.iter_chunked(16384): temp_file.write(chunk) downloaded += len(chunk) if content_length and downloaded % (1024 * 1024) == 0: progress = (downloaded / total_size) * 100 print(f" Progress: {progress:.1f}% ({downloaded / (1024*1024):.1f} MB)") temp_file.close() print(f"✅ File downloaded successfully: {temp_file.name}") # Return extension without the dot for consistency with modular_preprocessor ext_without_dot = ext[1:] if ext.startswith('.') else ext return temp_file.name, ext_without_dot except asyncio.TimeoutError: print(f" ⏰ Timeout on attempt {attempt + 1}") if attempt < max_retries - 1: wait_time = (attempt + 1) * 30 print(f" ⏳ Waiting {wait_time}s before retry...") await asyncio.sleep(wait_time) continue except Exception as e: print(f" ❌ Error on attempt {attempt + 1}: {str(e)}") if attempt < max_retries - 1: wait_time = (attempt + 1) * 15 print(f" ⏳ Waiting {wait_time}s before retry...") await asyncio.sleep(wait_time) continue raise Exception(f"Failed to download file after {max_retries} attempts") def cleanup_temp_file(self, temp_path: str) -> None: """Clean up temporary file.""" try: if os.path.exists(temp_path): os.unlink(temp_path) print(f"🗑️ Cleaned up temporary file: {temp_path}") except Exception as e: print(f"⚠️ Warning: Could not cleanup temp file {temp_path}: {e}")