Spaces:
Running
Running
| from flask import Flask, render_template, jsonify, request, send_file | |
| import torch | |
| import os | |
| import time | |
| import threading | |
| from datetime import datetime, timedelta | |
| import cv2 | |
| from werkzeug.utils import secure_filename | |
| import uuid | |
| import mimetypes | |
| import numpy as np | |
| from PIL import Image | |
| import schedule | |
| # Configuration | |
| UPLOAD_FOLDER = '/data/uploads' | |
| OUTPUT_FOLDER = '/data/outputs' | |
| CLEANUP_INTERVAL_MINUTES = 10 | |
| FILE_MAX_AGE_HOURS = 1 | |
| # Global application state | |
| app_state = { | |
| "cuda_available": torch.cuda.is_available(), | |
| "processing_active": False, | |
| "logs": [], | |
| "processed_files": [], | |
| "cleanup_stats": { | |
| "last_cleanup": None, | |
| "files_deleted": 0, | |
| "space_freed_mb": 0 | |
| } | |
| } | |
| def ensure_directories(): | |
| """Create necessary directories""" | |
| directories = [UPLOAD_FOLDER, OUTPUT_FOLDER] | |
| for directory in directories: | |
| try: | |
| os.makedirs(directory, exist_ok=True) | |
| print(f"✅ Directory verified: {directory}") | |
| except Exception as e: | |
| print(f"⚠️ Error creating directory {directory}: {e}") | |
| def allowed_file(filename): | |
| """Check if file has allowed extension""" | |
| return '.' in filename and \ | |
| filename.rsplit('.', 1)[1].lower() in ['png', 'jpg', 'jpeg', 'gif', 'mp4', 'avi', 'mov', 'mkv'] | |
| def get_file_mimetype(filename): | |
| """Get correct mimetype for file""" | |
| mimetype, _ = mimetypes.guess_type(filename) | |
| if mimetype is None: | |
| ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else '' | |
| if ext in ['mp4', 'avi', 'mov', 'mkv']: | |
| mimetype = f'video/{ext}' | |
| elif ext in ['png', 'jpg', 'jpeg', 'gif']: | |
| mimetype = f'image/{ext}' | |
| else: | |
| mimetype = 'application/octet-stream' | |
| return mimetype | |
| def log_message(message): | |
| """Add message to log with timestamp""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| app_state["logs"].append(f"[{timestamp}] {message}") | |
| if len(app_state["logs"]) > 100: | |
| app_state["logs"] = app_state["logs"][-100:] | |
| print(f"[{timestamp}] {message}") | |
| def cleanup_old_files(): | |
| """Delete files older than FILE_MAX_AGE_HOURS""" | |
| try: | |
| current_time = datetime.now() | |
| cutoff_time = current_time - timedelta(hours=FILE_MAX_AGE_HOURS) | |
| files_deleted = 0 | |
| space_freed = 0 | |
| # Clean upload folder | |
| for folder_path in [UPLOAD_FOLDER, OUTPUT_FOLDER]: | |
| if not os.path.exists(folder_path): | |
| continue | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| if os.path.isfile(file_path): | |
| try: | |
| # Get file modification time | |
| file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) | |
| if file_time < cutoff_time: | |
| # Get file size before deletion | |
| file_size = os.path.getsize(file_path) | |
| # Delete the file | |
| os.remove(file_path) | |
| files_deleted += 1 | |
| space_freed += file_size | |
| log_message(f"🗑️ Deleted old file: {filename} ({file_size / (1024*1024):.1f}MB)") | |
| except Exception as e: | |
| log_message(f"⚠️ Error deleting {filename}: {str(e)}") | |
| # Update cleanup stats | |
| app_state["cleanup_stats"]["last_cleanup"] = current_time.strftime("%Y-%m-%d %H:%M:%S") | |
| app_state["cleanup_stats"]["files_deleted"] += files_deleted | |
| app_state["cleanup_stats"]["space_freed_mb"] += space_freed / (1024*1024) | |
| if files_deleted > 0: | |
| log_message(f"🧹 Cleanup completed: {files_deleted} files deleted, {space_freed / (1024*1024):.1f}MB freed") | |
| else: | |
| log_message(f"🧹 Cleanup completed: No old files to delete") | |
| # Clean up processed files list to remove references to deleted files | |
| valid_processed_files = [] | |
| for file_info in app_state["processed_files"]: | |
| output_path = os.path.join(OUTPUT_FOLDER, file_info["output_file"]) | |
| if os.path.exists(output_path): | |
| valid_processed_files.append(file_info) | |
| app_state["processed_files"] = valid_processed_files | |
| except Exception as e: | |
| log_message(f"❌ Error during cleanup: {str(e)}") | |
| def run_scheduler(): | |
| """Run the file cleanup scheduler in background""" | |
| def scheduler_worker(): | |
| while True: | |
| try: | |
| schedule.run_pending() | |
| time.sleep(60) # Check every minute | |
| except Exception as e: | |
| log_message(f"❌ Scheduler error: {str(e)}") | |
| time.sleep(300) # Wait 5 minutes before retrying | |
| thread = threading.Thread(target=scheduler_worker, daemon=True) | |
| thread.start() | |
| log_message(f"🕒 File cleanup scheduler started (every {CLEANUP_INTERVAL_MINUTES} minutes)") | |
| def optimize_gpu(): | |
| """Optimize GPU configuration for 4K upscaling""" | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.backends.cudnn.benchmark = True | |
| torch.backends.cudnn.allow_tf32 = True | |
| torch.backends.cuda.matmul.allow_tf32 = True | |
| torch.cuda.empty_cache() | |
| # Test GPU | |
| test_tensor = torch.randn(100, 100, device='cuda') | |
| _ = torch.mm(test_tensor, test_tensor) | |
| log_message("✅ GPU optimized for 4K upscaling") | |
| return True | |
| else: | |
| log_message("⚠️ CUDA not available") | |
| return False | |
| except Exception as e: | |
| log_message(f"❌ Error optimizing GPU: {str(e)}") | |
| return False | |
| def upscale_image_4k(input_path, output_path): | |
| """Upscale image to 4K using neural methods""" | |
| def process_worker(): | |
| try: | |
| log_message(f"🎨 Starting 4K upscaling: {os.path.basename(input_path)}") | |
| app_state["processing_active"] = True | |
| # Read original image | |
| image = cv2.imread(input_path) | |
| if image is None: | |
| log_message("❌ Error: Could not read image") | |
| return | |
| h, w = image.shape[:2] | |
| log_message(f"📏 Original resolution: {w}x{h}") | |
| # Define target dimensions first | |
| target_h, target_w = h * 4, w * 4 | |
| # Check GPU memory availability | |
| if torch.cuda.is_available(): | |
| device = torch.device('cuda') | |
| available_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated() | |
| required_memory = w * h * 4 * 4 * 3 * 4 # Conservative estimation | |
| if required_memory > available_memory * 0.8: | |
| log_message(f"⚠️ Image too large for available GPU memory, using CPU") | |
| device = torch.device('cpu') | |
| else: | |
| log_message(f"🚀 Using GPU: {torch.cuda.get_device_name()}") | |
| if device.type == 'cuda': | |
| # Convert image to normalized tensor | |
| image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| image_tensor = torch.from_numpy(image_rgb).float().to(device) / 255.0 | |
| image_tensor = image_tensor.permute(2, 0, 1).unsqueeze(0) # BCHW format | |
| log_message("🧠 Applying neural upscaling...") | |
| with torch.no_grad(): | |
| # Step 1: 2x upscaling with bicubic | |
| intermediate = torch.nn.functional.interpolate( | |
| image_tensor, | |
| size=(h * 2, w * 2), | |
| mode='bicubic', | |
| align_corners=False, | |
| antialias=True | |
| ) | |
| # Step 2: Final 2x upscaling with smoothing | |
| upscaled = torch.nn.functional.interpolate( | |
| intermediate, | |
| size=(target_h, target_w), | |
| mode='bicubic', | |
| align_corners=False, | |
| antialias=True | |
| ) | |
| # Enhanced sharpening filters | |
| kernel_size = 3 | |
| sigma = 0.5 | |
| kernel = torch.zeros((kernel_size, kernel_size), device=device) | |
| center = kernel_size // 2 | |
| # Create inverted Gaussian kernel for sharpening | |
| for i in range(kernel_size): | |
| for j in range(kernel_size): | |
| dist = ((i - center) ** 2 + (j - center) ** 2) ** 0.5 | |
| kernel[i, j] = torch.exp(-0.5 * (dist / sigma) ** 2) | |
| kernel = kernel / kernel.sum() | |
| sharpen_kernel = torch.zeros_like(kernel) | |
| sharpen_kernel[center, center] = 2.0 | |
| sharpen_kernel = sharpen_kernel - kernel | |
| sharpen_kernel = sharpen_kernel.unsqueeze(0).unsqueeze(0) | |
| # Apply sharpening to each channel | |
| enhanced_channels = [] | |
| for i in range(3): | |
| channel = upscaled[:, i:i+1, :, :] | |
| padded = torch.nn.functional.pad(channel, (1, 1, 1, 1), mode='reflect') | |
| enhanced = torch.nn.functional.conv2d(padded, sharpen_kernel) | |
| enhanced_channels.append(enhanced) | |
| enhanced = torch.cat(enhanced_channels, dim=1) | |
| # Light smoothing to reduce noise | |
| gaussian_kernel = torch.tensor([ | |
| [1, 4, 6, 4, 1], | |
| [4, 16, 24, 16, 4], | |
| [6, 24, 36, 24, 6], | |
| [4, 16, 24, 16, 4], | |
| [1, 4, 6, 4, 1] | |
| ], dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0) / 256.0 | |
| smoothed_channels = [] | |
| for i in range(3): | |
| channel = enhanced[:, i:i+1, :, :] | |
| padded = torch.nn.functional.pad(channel, (2, 2, 2, 2), mode='reflect') | |
| smoothed = torch.nn.functional.conv2d(padded, gaussian_kernel) | |
| smoothed_channels.append(smoothed) | |
| smoothed = torch.cat(smoothed_channels, dim=1) | |
| # Blend: 70% enhanced + 30% smoothed for quality/smoothness balance | |
| final_result = 0.7 * enhanced + 0.3 * smoothed | |
| # Clamp values and optimize contrast | |
| final_result = torch.clamp(final_result, 0, 1) | |
| # Adaptive contrast optimization | |
| for i in range(3): | |
| channel = final_result[:, i, :, :] | |
| min_val = channel.min() | |
| max_val = channel.max() | |
| if max_val > min_val: | |
| final_result[:, i, :, :] = (channel - min_val) / (max_val - min_val) | |
| # Convert back to image | |
| result_cpu = final_result.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
| result_image = (result_cpu * 255).astype(np.uint8) | |
| result_bgr = cv2.cvtColor(result_image, cv2.COLOR_RGB2BGR) | |
| # Save result | |
| cv2.imwrite(output_path, result_bgr) | |
| final_h, final_w = result_bgr.shape[:2] | |
| log_message(f"✅ Upscaling completed: {final_w}x{final_h}") | |
| log_message(f"📈 Scale factor: {final_w/w:.1f}x") | |
| # Memory cleanup | |
| del image_tensor, upscaled, enhanced, final_result | |
| torch.cuda.empty_cache() | |
| else: | |
| # CPU fallback | |
| log_message("⚠️ Using CPU - optimized processing") | |
| # Progressive upscaling on CPU | |
| intermediate = cv2.resize(image, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) | |
| upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
| # Apply sharpening on CPU | |
| kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) | |
| sharpened = cv2.filter2D(upscaled, -1, kernel) | |
| # Blend for smoothing | |
| final_result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0) | |
| cv2.imwrite(output_path, final_result) | |
| log_message(f"✅ CPU upscaling completed: {target_w}x{target_h}") | |
| else: | |
| # CPU only fallback (no CUDA available) | |
| log_message("💻 Using CPU processing (CUDA not available)") | |
| # Progressive upscaling on CPU | |
| intermediate = cv2.resize(image, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) | |
| upscaled = cv2.resize(intermediate, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
| # Apply sharpening on CPU | |
| kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) | |
| sharpened = cv2.filter2D(upscaled, -1, kernel) | |
| # Blend for smoothing | |
| final_result = cv2.addWeighted(upscaled, 0.7, sharpened, 0.3, 0) | |
| cv2.imwrite(output_path, final_result) | |
| log_message(f"✅ CPU upscaling completed: {target_w}x{target_h}") | |
| # Add to processed files list | |
| app_state["processed_files"].append({ | |
| "input_file": os.path.basename(input_path), | |
| "output_file": os.path.basename(output_path), | |
| "original_size": f"{w}x{h}", | |
| "upscaled_size": f"{target_w}x{target_h}", | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }) | |
| except Exception as e: | |
| log_message(f"❌ Error in processing: {str(e)}") | |
| finally: | |
| app_state["processing_active"] = False | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| thread = threading.Thread(target=process_worker) | |
| thread.daemon = True | |
| thread.start() | |
| def upscale_video_4k(input_path, output_path): | |
| """Upscale video to 4K frame by frame""" | |
| def process_worker(): | |
| try: | |
| log_message(f"🎬 Starting 4K video upscaling: {os.path.basename(input_path)}") | |
| app_state["processing_active"] = True | |
| # Open video | |
| cap = cv2.VideoCapture(input_path) | |
| if not cap.isOpened(): | |
| log_message("❌ Error: Could not open video") | |
| return | |
| # Get video properties | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| log_message(f"📹 Video: {w}x{h}, {fps}FPS, {frame_count} frames") | |
| # Configure 4K output | |
| target_w, target_h = w * 4, h * 4 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (target_w, target_h)) | |
| if torch.cuda.is_available(): | |
| device = torch.device('cuda') | |
| log_message(f"🚀 Processing with GPU: {torch.cuda.get_device_name()}") | |
| process_frames_gpu(cap, out, device, target_h, target_w, frame_count) | |
| else: | |
| log_message("💻 Processing with CPU (may be slower)") | |
| process_frames_cpu(cap, out, target_h, target_w, frame_count) | |
| cap.release() | |
| out.release() | |
| # Verify the output file was created and has content | |
| if os.path.exists(output_path): | |
| file_size = os.path.getsize(output_path) | |
| if file_size > 0: | |
| log_message(f"✅ 4K video completed: {target_w}x{target_h}") | |
| log_message(f"📁 Output file size: {file_size / (1024**2):.1f}MB") | |
| else: | |
| log_message(f"❌ Output file is empty: {output_path}") | |
| raise Exception("Output video file is empty") | |
| else: | |
| log_message(f"❌ Output file not created: {output_path}") | |
| raise Exception("Output video file was not created") | |
| # Add to processed files list | |
| app_state["processed_files"].append({ | |
| "input_file": os.path.basename(input_path), | |
| "output_file": os.path.basename(output_path), | |
| "original_size": f"{w}x{h}", | |
| "upscaled_size": f"{target_w}x{target_h}", | |
| "frame_count": frame_count, | |
| "fps": fps, | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }) | |
| except Exception as e: | |
| log_message(f"❌ Error processing video: {str(e)}") | |
| finally: | |
| app_state["processing_active"] = False | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| thread = threading.Thread(target=process_worker) | |
| thread.daemon = True | |
| thread.start() | |
| def process_frames_cpu(cap, out, target_h, target_w, frame_count): | |
| """Process video frames using CPU""" | |
| frame_num = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_num += 1 | |
| # Simple CPU upscaling | |
| upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
| out.write(upscaled_frame) | |
| # Progress logging | |
| if frame_num % 30 == 0: | |
| progress = (frame_num / frame_count) * 100 | |
| log_message(f"🎞️ Processing frame {frame_num}/{frame_count} ({progress:.1f}%)") | |
| def process_frames_gpu(cap, out, device, target_h, target_w, frame_count): | |
| """Process video frames using GPU with PyTorch""" | |
| frame_num = 0 | |
| torch.backends.cudnn.benchmark = True | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_num += 1 | |
| try: | |
| # Convert to tensor | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 | |
| frame_tensor = frame_tensor.permute(2, 0, 1).unsqueeze(0) | |
| with torch.no_grad(): | |
| upscaled = torch.nn.functional.interpolate( | |
| frame_tensor, | |
| size=(target_h, target_w), | |
| mode='bicubic', | |
| align_corners=False | |
| ) | |
| # Convert back | |
| result_cpu = upscaled.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
| result_frame = (result_cpu * 255).astype(np.uint8) | |
| result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) | |
| out.write(result_bgr) | |
| except Exception as e: | |
| log_message(f"⚠️ GPU processing failed for frame {frame_num}, using CPU fallback") | |
| # CPU fallback | |
| upscaled_frame = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_CUBIC) | |
| out.write(upscaled_frame) | |
| # Progress logging | |
| if frame_num % 30 == 0: | |
| progress = (frame_num / frame_count) * 100 | |
| log_message(f"🎞️ Processing frame {frame_num}/{frame_count} ({progress:.1f}%)") | |
| # Periodic memory cleanup | |
| if frame_num % 60 == 0 and torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| def process_frame_batch(frame_batch, out, device, target_h, target_w): | |
| """Process batch of frames on GPU for efficiency""" | |
| try: | |
| with torch.no_grad(): | |
| # Convert batch to tensor | |
| batch_tensors = [] | |
| for frame in frame_batch: | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 | |
| frame_tensor = frame_tensor.permute(2, 0, 1) # CHW | |
| batch_tensors.append(frame_tensor) | |
| # Stack in batch | |
| batch_tensor = torch.stack(batch_tensors, dim=0) # BCHW | |
| # Upscale entire batch | |
| upscaled_batch = torch.nn.functional.interpolate( | |
| batch_tensor, | |
| size=(target_h, target_w), | |
| mode='bicubic', | |
| align_corners=False, | |
| antialias=True | |
| ) | |
| # Convert each frame back | |
| for i in range(upscaled_batch.shape[0]): | |
| result_cpu = upscaled_batch[i].permute(1, 2, 0).cpu().numpy() | |
| result_frame = (result_cpu * 255).astype(np.uint8) | |
| result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) | |
| out.write(result_bgr) | |
| except Exception as e: | |
| log_message(f"❌ Error in batch processing: {str(e)}") | |
| # Fallback: process frames individually | |
| for frame in frame_batch: | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frame_tensor = torch.from_numpy(frame_rgb).float().to(device) / 255.0 | |
| frame_tensor = frame_tensor.permute(2, 0, 1).unsqueeze(0) | |
| upscaled = torch.nn.functional.interpolate( | |
| frame_tensor, | |
| size=(target_h, target_w), | |
| mode='bicubic', | |
| align_corners=False | |
| ) | |
| result_cpu = upscaled.squeeze(0).permute(1, 2, 0).cpu().numpy() | |
| result_frame = (result_cpu * 255).astype(np.uint8) | |
| result_bgr = cv2.cvtColor(result_frame, cv2.COLOR_RGB2BGR) | |
| out.write(result_bgr) | |
| # Initialize directories | |
| ensure_directories() | |
| # Set up file cleanup scheduler | |
| schedule.every(CLEANUP_INTERVAL_MINUTES).minutes.do(cleanup_old_files) | |
| app = Flask(__name__) | |
| def index(): | |
| return render_template('index.html') | |
| def api_system(): | |
| """Get system information""" | |
| try: | |
| info = {} | |
| # GPU Info | |
| if torch.cuda.is_available(): | |
| info["gpu_available"] = True | |
| info["gpu_name"] = torch.cuda.get_device_name() | |
| total_memory = torch.cuda.get_device_properties(0).total_memory | |
| allocated_memory = torch.cuda.memory_allocated() | |
| info["gpu_memory"] = f"{total_memory / (1024**3):.1f}GB" | |
| info["gpu_memory_used"] = f"{allocated_memory / (1024**3):.1f}GB" | |
| info["gpu_memory_free"] = f"{(total_memory - allocated_memory) / (1024**3):.1f}GB" | |
| info["cuda_version"] = torch.version.cuda | |
| info["pytorch_version"] = torch.__version__ | |
| else: | |
| info["gpu_available"] = False | |
| info["gpu_name"] = "CPU Only (No GPU detected)" | |
| info["gpu_memory"] = "N/A" | |
| info["gpu_memory_used"] = "N/A" | |
| info["gpu_memory_free"] = "N/A" | |
| info["cuda_version"] = "Not available" | |
| info["pytorch_version"] = torch.__version__ | |
| # Storage info | |
| if os.path.exists("/data"): | |
| info["persistent_storage"] = True | |
| try: | |
| upload_files = os.listdir(UPLOAD_FOLDER) if os.path.exists(UPLOAD_FOLDER) else [] | |
| output_files = os.listdir(OUTPUT_FOLDER) if os.path.exists(OUTPUT_FOLDER) else [] | |
| upload_size = sum(os.path.getsize(os.path.join(UPLOAD_FOLDER, f)) | |
| for f in upload_files if os.path.isfile(os.path.join(UPLOAD_FOLDER, f))) | |
| output_size = sum(os.path.getsize(os.path.join(OUTPUT_FOLDER, f)) | |
| for f in output_files if os.path.isfile(os.path.join(OUTPUT_FOLDER, f))) | |
| info["storage_uploads"] = f"{upload_size / (1024**2):.1f}MB" | |
| info["storage_outputs"] = f"{output_size / (1024**2):.1f}MB" | |
| info["upload_files_count"] = len(upload_files) | |
| info["output_files_count"] = len(output_files) | |
| # Add cleanup info | |
| info["cleanup_stats"] = app_state["cleanup_stats"] | |
| info["cleanup_interval"] = f"{CLEANUP_INTERVAL_MINUTES} minutes" | |
| info["file_max_age"] = f"{FILE_MAX_AGE_HOURS} hour(s)" | |
| except Exception as e: | |
| info["storage_uploads"] = f"Error: {str(e)}" | |
| info["storage_outputs"] = "N/A" | |
| info["upload_files_count"] = 0 | |
| info["output_files_count"] = 0 | |
| else: | |
| info["persistent_storage"] = False | |
| return jsonify({"success": True, "data": info}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_upload(): | |
| """Upload and process file for 4K upscaling""" | |
| try: | |
| if 'file' not in request.files: | |
| return jsonify({"success": False, "error": "No file provided"}) | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"success": False, "error": "No file selected"}) | |
| if file and allowed_file(file.filename): | |
| file_id = str(uuid.uuid4()) | |
| filename = secure_filename(file.filename) | |
| file_ext = filename.rsplit('.', 1)[1].lower() | |
| input_filename = f"{file_id}_input.{file_ext}" | |
| input_path = os.path.join(UPLOAD_FOLDER, input_filename) | |
| file.save(input_path) | |
| output_filename = f"{file_id}_4k.{file_ext}" | |
| output_path = os.path.join(OUTPUT_FOLDER, output_filename) | |
| if file_ext in ['png', 'jpg', 'jpeg', 'gif']: | |
| upscale_image_4k(input_path, output_path) | |
| media_type = "image" | |
| elif file_ext in ['mp4', 'avi', 'mov', 'mkv']: | |
| upscale_video_4k(input_path, output_path) | |
| media_type = "video" | |
| log_message(f"📤 File uploaded: {filename}") | |
| log_message(f"🎯 Starting 4K transformation...") | |
| return jsonify({ | |
| "success": True, | |
| "file_id": file_id, | |
| "filename": filename, | |
| "output_filename": output_filename, | |
| "media_type": media_type, | |
| "message": "Upload successful, processing started" | |
| }) | |
| else: | |
| return jsonify({"success": False, "error": "File type not allowed"}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_processing_status(): | |
| """Get processing status""" | |
| return jsonify({ | |
| "success": True, | |
| "processing": app_state["processing_active"], | |
| "processed_files": app_state["processed_files"] | |
| }) | |
| def api_download(filename): | |
| """Download processed file""" | |
| try: | |
| file_path = os.path.join(OUTPUT_FOLDER, filename) | |
| if os.path.exists(file_path): | |
| mimetype = get_file_mimetype(filename) | |
| file_ext = filename.lower().rsplit('.', 1)[1] if '.' in filename else '' | |
| if file_ext in ['mp4', 'avi', 'mov', 'mkv']: | |
| return send_file( | |
| file_path, | |
| as_attachment=True, | |
| download_name=f"4k_upscaled_{filename}", | |
| mimetype=mimetype | |
| ) | |
| else: | |
| return send_file( | |
| file_path, | |
| as_attachment=True, | |
| download_name=f"4k_upscaled_{filename}", | |
| mimetype=mimetype | |
| ) | |
| else: | |
| return jsonify({"error": "File not found"}), 404 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def api_preview(filename): | |
| """Preview processed file""" | |
| try: | |
| file_path = os.path.join(OUTPUT_FOLDER, filename) | |
| if os.path.exists(file_path): | |
| mimetype = get_file_mimetype(filename) | |
| return send_file(file_path, mimetype=mimetype) | |
| else: | |
| return jsonify({"error": "File not found"}), 404 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def api_logs(): | |
| """Get application logs""" | |
| return jsonify({ | |
| "success": True, | |
| "logs": app_state["logs"] | |
| }) | |
| def api_clear_logs(): | |
| """Clear application logs""" | |
| app_state["logs"] = [] | |
| log_message("🧹 Logs cleared") | |
| return jsonify({"success": True, "message": "Logs cleared"}) | |
| def api_optimize_gpu(): | |
| """Optimize GPU for processing""" | |
| try: | |
| success = optimize_gpu() | |
| if success: | |
| return jsonify({"success": True, "message": "GPU optimized"}) | |
| else: | |
| return jsonify({"success": False, "message": "GPU optimization failed"}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_clear_cache(): | |
| """Clear GPU cache and processed files""" | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| app_state["processed_files"] = [] | |
| log_message("🧹 Cache and history cleared") | |
| return jsonify({"success": True, "message": "Cache cleared"}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_cleanup_now(): | |
| """Manually trigger file cleanup""" | |
| try: | |
| cleanup_old_files() | |
| return jsonify({"success": True, "message": "Manual cleanup completed"}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| def api_storage_stats(): | |
| """Get detailed storage statistics""" | |
| try: | |
| stats = { | |
| "cleanup_stats": app_state["cleanup_stats"], | |
| "current_files": {}, | |
| "total_storage_mb": 0 | |
| } | |
| for folder_name, folder_path in [("uploads", UPLOAD_FOLDER), ("outputs", OUTPUT_FOLDER)]: | |
| if os.path.exists(folder_path): | |
| files = [] | |
| total_size = 0 | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| if os.path.isfile(file_path): | |
| file_size = os.path.getsize(file_path) | |
| file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) | |
| files.append({ | |
| "name": filename, | |
| "size_mb": file_size / (1024*1024), | |
| "created": file_time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "age_hours": (datetime.now() - file_time).total_seconds() / 3600 | |
| }) | |
| total_size += file_size | |
| stats["current_files"][folder_name] = { | |
| "files": files, | |
| "count": len(files), | |
| "total_size_mb": total_size / (1024*1024) | |
| } | |
| stats["total_storage_mb"] += total_size / (1024*1024) | |
| return jsonify({"success": True, "data": stats}) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": str(e)}) | |
| if __name__ == '__main__': | |
| # Initialize system | |
| log_message("🚀 4K Upscaler starting...") | |
| try: | |
| # Start file cleanup scheduler | |
| run_scheduler() | |
| # Optimize GPU if available | |
| if optimize_gpu(): | |
| log_message("✅ GPU optimized for 4K upscaling") | |
| else: | |
| log_message("⚠️ GPU optimization failed, using CPU fallback") | |
| # Run initial cleanup | |
| log_message("🧹 Running initial file cleanup...") | |
| cleanup_old_files() | |
| log_message("✅ 4K Upscaler ready") | |
| log_message("📤 Upload images or videos to upscale to 4K resolution") | |
| log_message(f"🗑️ Files will be automatically deleted after {FILE_MAX_AGE_HOURS} hour(s)") | |
| except Exception as e: | |
| log_message(f"❌ Initialization error: {str(e)}") | |
| log_message("⚠️ Starting in fallback mode...") | |
| # Run application | |
| try: | |
| app.run(host='0.0.0.0', port=7860, debug=False, threaded=True) | |
| except Exception as e: | |
| log_message(f"❌ Server startup error: {str(e)}") | |
| print(f"Critical error: {str(e)}") |