|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
import os
|
|
|
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "")
|
|
|
os.environ.setdefault("FORCE_CPU", "1")
|
|
|
os.environ.setdefault("TRANSFORMERS_NO_ADVISORY_WARNINGS", "1")
|
|
|
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
|
|
|
os.environ.setdefault("TORCHDYNAMO_DISABLE", "1")
|
|
|
os.environ.setdefault("DISABLE_TORCH_COMPILE", "1")
|
|
|
|
|
|
|
|
|
def _patch_torch_cpu_only():
|
|
|
try:
|
|
|
import types
|
|
|
import torch
|
|
|
|
|
|
|
|
|
_orig_load = torch.load
|
|
|
def _load_cpu_default(*args, **kwargs):
|
|
|
kwargs.setdefault("map_location", torch.device("cpu"))
|
|
|
return _orig_load(*args, **kwargs)
|
|
|
torch.load = _load_cpu_default
|
|
|
|
|
|
|
|
|
class _CudaShim(types.SimpleNamespace):
|
|
|
def is_available(self): return False
|
|
|
def device_count(self): return 0
|
|
|
def current_device(self): return 0
|
|
|
def manual_seed_all(self, *a, **k): return None
|
|
|
def empty_cache(self): return None
|
|
|
def ipc_collect(self): return None
|
|
|
def synchronize(self, *a, **k): return None
|
|
|
def set_device(self, *a, **k): return None
|
|
|
def __getattr__(self, _):
|
|
|
return lambda *a, **k: None
|
|
|
|
|
|
torch.cuda = _CudaShim()
|
|
|
try:
|
|
|
torch.set_default_device("cpu")
|
|
|
except Exception:
|
|
|
pass
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
_patch_torch_cpu_only()
|
|
|
|
|
|
|
|
|
|
|
|
import sys
|
|
|
from pathlib import Path
|
|
|
ROOT = Path(__file__).resolve().parent
|
|
|
FLASHVSR_DIR = ROOT / "FlashVSR"
|
|
|
if FLASHVSR_DIR.exists():
|
|
|
sys.path.insert(0, str(FLASHVSR_DIR))
|
|
|
|
|
|
|
|
|
import shlex
|
|
|
import subprocess
|
|
|
from datetime import datetime
|
|
|
import gradio as gr
|
|
|
|
|
|
def _run(cmd: str) -> tuple[int, str]:
|
|
|
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
|
|
out = []
|
|
|
for line in p.stdout:
|
|
|
out.append(line.rstrip())
|
|
|
p.wait()
|
|
|
return p.returncode, "\n".join(out)
|
|
|
|
|
|
def normalize_to_playable_mp4(in_path: str) -> tuple[str, str]:
|
|
|
"""
|
|
|
Make a widely playable MP4 (yuv420p). If audio is invalid (0-ch, corrupt), strip it.
|
|
|
Returns (out_path, combined_log).
|
|
|
"""
|
|
|
in_p = Path(in_path)
|
|
|
out_dir = ROOT / "outputs"
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
out_path = str(out_dir / f"{in_p.stem}_playable.mp4")
|
|
|
|
|
|
|
|
|
cmd_keep_audio = (
|
|
|
f'ffmpeg -y -i {shlex.quote(in_path)} '
|
|
|
f'-map 0:v:0 -map 0:a? '
|
|
|
f'-vf "scale=trunc(iw/2)*2:trunc(ih/2)*2,format=yuv420p" '
|
|
|
f'-c:v libx264 -preset veryfast -crf 20 '
|
|
|
f'-c:a aac -b:a 160k -movflags +faststart {shlex.quote(out_path)}'
|
|
|
)
|
|
|
code, log = _run(cmd_keep_audio)
|
|
|
if code == 0 and Path(out_path).exists():
|
|
|
return out_path, log
|
|
|
|
|
|
|
|
|
cmd_no_audio = (
|
|
|
f'ffmpeg -y -probesize 100M -analyzeduration 100M -i {shlex.quote(in_path)} '
|
|
|
f'-map 0:v:0 -an '
|
|
|
f'-vf "scale=trunc(iw/2)*2:trunc(ih/2)*2,format=yuv420p" '
|
|
|
f'-c:v libx264 -preset veryfast -crf 20 '
|
|
|
f'-movflags +faststart {shlex.quote(out_path)}'
|
|
|
)
|
|
|
code2, log2 = _run(cmd_no_audio)
|
|
|
if code2 != 0:
|
|
|
raise RuntimeError(
|
|
|
"Failed to normalize video to MP4.\n\nFIRST ATTEMPT LOG:\n"
|
|
|
+ log + "\n\nSECOND ATTEMPT LOG:\n" + log2
|
|
|
)
|
|
|
return out_path, log + "\n\n-- Fallback (no-audio) log --\n" + log2
|
|
|
|
|
|
def upscale_x4_ffmpeg(in_path: str) -> tuple[str, str]:
|
|
|
"""
|
|
|
Pure CPU x4 resize using Lanczos via FFmpeg. Returns (out_path, log).
|
|
|
"""
|
|
|
in_p = Path(in_path)
|
|
|
out_dir = ROOT / "outputs"
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
out_path = str(out_dir / f"{in_p.stem}_x4.mp4")
|
|
|
|
|
|
cmd = (
|
|
|
f'ffmpeg -y -i {shlex.quote(in_path)} '
|
|
|
f'-map 0:v:0 -map 0:a? '
|
|
|
f'-vf "scale=trunc(iw*4/2)*2:trunc(ih*4/2)*2:flags=lanczos,format=yuv420p" '
|
|
|
f'-c:v libx264 -preset veryfast -crf 18 '
|
|
|
f'-c:a aac -b:a 192k -movflags +faststart {shlex.quote(out_path)}'
|
|
|
)
|
|
|
code, log = _run(cmd)
|
|
|
if code != 0:
|
|
|
raise RuntimeError("x4 upscale failed.\n\nLOG:\n" + log)
|
|
|
return out_path, log
|
|
|
|
|
|
def pipeline(video_path: str) -> tuple[str, str, str]:
|
|
|
ts = datetime.utcnow().isoformat(timespec="seconds")
|
|
|
logs = [f"Start: {ts} UTC", f"Input: {video_path}"]
|
|
|
|
|
|
playable, log1 = normalize_to_playable_mp4(video_path)
|
|
|
logs.append("Normalize log:\n" + log1)
|
|
|
|
|
|
|
|
|
x4, log2 = upscale_x4_ffmpeg(playable)
|
|
|
logs.append("Upscale log:\n" + log2)
|
|
|
|
|
|
return playable, x4, "\n\n".join(logs)
|
|
|
|
|
|
with gr.Blocks(title="CPU Video Normalize + x4 (FFmpeg baseline)") as app:
|
|
|
gr.Markdown(
|
|
|
"""
|
|
|
# CPU Video Normalize + x4 (baseline)
|
|
|
|
|
|
- **Strictly CPU-only** runtime (CUDA disabled; TorchDynamo off).
|
|
|
- Step 1: Convert input to a widely playable MP4 (`yuv420p`).
|
|
|
- Step 2: x4 upscale with Lanczos (FFmpeg).
|
|
|
Integrate FlashVSR-CPU later between steps if desired.
|
|
|
"""
|
|
|
)
|
|
|
inp = gr.Video(label="Upload a video", sources=["upload"], format="mp4", height=240)
|
|
|
btn = gr.Button("Process (CPU)")
|
|
|
playable_out = gr.Video(label="Playable MP4")
|
|
|
x4_out = gr.Video(label="x4 Upscaled (CPU)")
|
|
|
log_out = gr.Textbox(label="Logs", lines=18)
|
|
|
|
|
|
def _go(video_file):
|
|
|
if not video_file:
|
|
|
raise gr.Error("Please upload a video.")
|
|
|
in_path = video_file if isinstance(video_file, str) else video_file.get("name")
|
|
|
playable, x4, logs = pipeline(in_path)
|
|
|
return playable, x4, logs
|
|
|
|
|
|
btn.click(_go, inputs=[inp], outputs=[playable_out, x4_out, log_out])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
app.queue().launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))
|
|
|
|