quantumbit's picture
Upload 41 files
5ff6b14 verified
raw
history blame
5.23 kB
import aiohttp
import asyncio
import tempfile
import os
import re
from urllib.parse import urlparse
from typing import List, Tuple
class FileDownloader:
"""Enhanced file downloader that supports multiple file types."""
async def download_file(self, url: str, timeout: int = 300, max_retries: int = 3) -> Tuple[str, str]:
"""Download any file type from a URL to a temporary file with enhanced error handling."""
print(f"πŸ“₯ Downloading file from: {url[:60]}...")
for attempt in range(max_retries):
try:
timeout_config = aiohttp.ClientTimeout(
total=timeout,
connect=30,
sock_read=120
)
async with aiohttp.ClientSession(timeout=timeout_config) as session:
print(f" Attempt {attempt + 1}/{max_retries} (timeout: {timeout}s)")
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"Failed to download file: HTTP {response.status}")
# Extract filename from header or URL
cd = response.headers.get('Content-Disposition', '')
filename_match = re.findall('filename="?([^"]+)"?', cd)
if filename_match:
filename = filename_match[0]
else:
from urllib.parse import unquote
path = urlparse(url).path
filename = os.path.basename(unquote(path)) # Decode URL encoding
if not filename:
filename = "downloaded_file"
ext = os.path.splitext(filename)[1]
if not ext:
return url, "url"
print(f" πŸ“ Detected filename: {filename}, extension: {ext}")
# Check if file type is supported
supported_extensions = ['.pdf', '.docx', '.pptx', '.png', '.xlsx', '.jpeg', '.jpg', '.txt', '.csv']
if ext not in supported_extensions:
# Return extension without dot for consistency
ext_without_dot = ext[1:] if ext.startswith('.') else ext
print(f" ❌ File type not supported: {ext}")
return 'not supported', ext_without_dot
# Get content length
content_length = response.headers.get('content-length')
if content_length:
total_size = int(content_length)
print(f" File size: {total_size / (1024 * 1024):.1f} MB")
# Create temp file with same extension
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext, prefix="download_")
# Write to file
downloaded = 0
async for chunk in response.content.iter_chunked(16384):
temp_file.write(chunk)
downloaded += len(chunk)
if content_length and downloaded % (1024 * 1024) == 0:
progress = (downloaded / total_size) * 100
print(f" Progress: {progress:.1f}% ({downloaded / (1024*1024):.1f} MB)")
temp_file.close()
print(f"βœ… File downloaded successfully: {temp_file.name}")
# Return extension without the dot for consistency with modular_preprocessor
ext_without_dot = ext[1:] if ext.startswith('.') else ext
return temp_file.name, ext_without_dot
except asyncio.TimeoutError:
print(f" ⏰ Timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 30
print(f" ⏳ Waiting {wait_time}s before retry...")
await asyncio.sleep(wait_time)
continue
except Exception as e:
print(f" ❌ Error on attempt {attempt + 1}: {str(e)}")
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 15
print(f" ⏳ Waiting {wait_time}s before retry...")
await asyncio.sleep(wait_time)
continue
raise Exception(f"Failed to download file after {max_retries} attempts")
def cleanup_temp_file(self, temp_path: str) -> None:
"""Clean up temporary file."""
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
print(f"πŸ—‘οΈ Cleaned up temporary file: {temp_path}")
except Exception as e:
print(f"⚠️ Warning: Could not cleanup temp file {temp_path}: {e}")