Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| import torch | |
| import io | |
| import asyncio | |
| from basicsr.archs.rrdbnet_arch import RRDBNet | |
| from realesrgan import RealESRGANer | |
| from huggingface_hub import hf_hub_download | |
| from concurrent.futures import ThreadPoolExecutor | |
| class VideoEnhancer: | |
| def __init__(self, model_name="RealESRGAN_x4plus"): | |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| self.model = self.load_model(model_name) | |
| self.executor = ThreadPoolExecutor(max_workers=4) | |
| def load_model(self, model_name): | |
| if model_name == "RealESRGAN_x4plus": | |
| model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4) | |
| model_path = hf_hub_download("schwgHao/RealESRGAN_x4plus", "RealESRGAN_x4plus.pth") | |
| return RealESRGANer(scale=4, model_path=model_path, model=model, tile=0, tile_pad=10, pre_pad=0, half=True) | |
| else: | |
| raise ValueError(f"Unsupported model: {model_name}") | |
| async def enhance_frame(self, frame): | |
| loop = asyncio.get_running_loop() | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| enhanced, _ = await loop.run_in_executor(self.executor, self.model.enhance, frame_rgb) | |
| return cv2.cvtColor(enhanced, cv2.COLOR_RGB2BGR) | |
| async def process_video(self, input_bytes, output_bytes): | |
| cap = cv2.VideoCapture(input_bytes) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_bytes, fourcc, fps, (width * 4, height * 4)) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| enhanced_frame = await self.enhance_frame(frame) | |
| out.write(enhanced_frame) | |
| cap.release() | |
| out.release() | |
| async def stream_enhanced_video(self, video_file): | |
| video_bytes = await video_file.read() | |
| cap = cv2.VideoCapture(io.BytesIO(video_bytes).getvalue()) | |
| async def generate(): | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| enhanced_frame = await self.enhance_frame(frame) | |
| _, buffer = cv2.imencode('.jpg', enhanced_frame) | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') | |
| cap.release() | |
| return StreamingResponse(generate(), media_type="multipart/x-mixed-replace; boundary=frame") |