testai / app.py
MStater's picture
Upload app.py
ecca37f verified
# app.py
from __future__ import annotations
# ── CPU-only guard (set *before* any torch/transformers import) ─────────────────
import os
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "") # hide all GPUs
os.environ.setdefault("FORCE_CPU", "1")
os.environ.setdefault("TRANSFORMERS_NO_ADVISORY_WARNINGS", "1")
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
os.environ.setdefault("TORCHDYNAMO_DISABLE", "1") # avoid dynamo import shenanigans
os.environ.setdefault("DISABLE_TORCH_COMPILE", "1")
# Optional: make every torch.load default to CPU even in 3rd-party code
def _patch_torch_cpu_only():
try:
import types
import torch
# map_location='cpu' for all torch.load by default
_orig_load = torch.load
def _load_cpu_default(*args, **kwargs):
kwargs.setdefault("map_location", torch.device("cpu"))
return _orig_load(*args, **kwargs)
torch.load = _load_cpu_default # type: ignore
# A permissive cuda shim so stray calls don't crash on CPU machines
class _CudaShim(types.SimpleNamespace):
def is_available(self): return False
def device_count(self): return 0
def current_device(self): return 0
def manual_seed_all(self, *a, **k): return None
def empty_cache(self): return None
def ipc_collect(self): return None
def synchronize(self, *a, **k): return None
def set_device(self, *a, **k): return None
def __getattr__(self, _): # amp/memory/etc.
return lambda *a, **k: None
torch.cuda = _CudaShim() # type: ignore
try:
torch.set_default_device("cpu") # torch>=2.1
except Exception:
pass
except Exception:
# If torch isn't installed yet or anything fails, just keep going.
pass
_patch_torch_cpu_only()
# ────────────────────────────────────────────────────────────────────────────────
# If you *do* need to import internal FlashVSR modules later, make them discoverable
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parent
FLASHVSR_DIR = ROOT / "FlashVSR"
if FLASHVSR_DIR.exists():
sys.path.insert(0, str(FLASHVSR_DIR))
# ── App logic (pure CPU pipeline using FFmpeg) ──────────────────────────────────
import shlex
import subprocess
from datetime import datetime
import gradio as gr
def _run(cmd: str) -> tuple[int, str]:
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
out = []
for line in p.stdout: # type: ignore[attr-defined]
out.append(line.rstrip())
p.wait()
return p.returncode, "\n".join(out)
def normalize_to_playable_mp4(in_path: str) -> tuple[str, str]:
"""
Make a widely playable MP4 (yuv420p). If audio is invalid (0-ch, corrupt), strip it.
Returns (out_path, combined_log).
"""
in_p = Path(in_path)
out_dir = ROOT / "outputs"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = str(out_dir / f"{in_p.stem}_playable.mp4")
# Attempt with audio
cmd_keep_audio = (
f'ffmpeg -y -i {shlex.quote(in_path)} '
f'-map 0:v:0 -map 0:a? '
f'-vf "scale=trunc(iw/2)*2:trunc(ih/2)*2,format=yuv420p" '
f'-c:v libx264 -preset veryfast -crf 20 '
f'-c:a aac -b:a 160k -movflags +faststart {shlex.quote(out_path)}'
)
code, log = _run(cmd_keep_audio)
if code == 0 and Path(out_path).exists():
return out_path, log
# Fallback: strip audio entirely
cmd_no_audio = (
f'ffmpeg -y -probesize 100M -analyzeduration 100M -i {shlex.quote(in_path)} '
f'-map 0:v:0 -an '
f'-vf "scale=trunc(iw/2)*2:trunc(ih/2)*2,format=yuv420p" '
f'-c:v libx264 -preset veryfast -crf 20 '
f'-movflags +faststart {shlex.quote(out_path)}'
)
code2, log2 = _run(cmd_no_audio)
if code2 != 0:
raise RuntimeError(
"Failed to normalize video to MP4.\n\nFIRST ATTEMPT LOG:\n"
+ log + "\n\nSECOND ATTEMPT LOG:\n" + log2
)
return out_path, log + "\n\n-- Fallback (no-audio) log --\n" + log2
def upscale_x4_ffmpeg(in_path: str) -> tuple[str, str]:
"""
Pure CPU x4 resize using Lanczos via FFmpeg. Returns (out_path, log).
"""
in_p = Path(in_path)
out_dir = ROOT / "outputs"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = str(out_dir / f"{in_p.stem}_x4.mp4")
cmd = (
f'ffmpeg -y -i {shlex.quote(in_path)} '
f'-map 0:v:0 -map 0:a? '
f'-vf "scale=trunc(iw*4/2)*2:trunc(ih*4/2)*2:flags=lanczos,format=yuv420p" '
f'-c:v libx264 -preset veryfast -crf 18 '
f'-c:a aac -b:a 192k -movflags +faststart {shlex.quote(out_path)}'
)
code, log = _run(cmd)
if code != 0:
raise RuntimeError("x4 upscale failed.\n\nLOG:\n" + log)
return out_path, log
def pipeline(video_path: str) -> tuple[str, str, str]:
ts = datetime.utcnow().isoformat(timespec="seconds")
logs = [f"Start: {ts} UTC", f"Input: {video_path}"]
playable, log1 = normalize_to_playable_mp4(video_path)
logs.append("Normalize log:\n" + log1)
# Baseline CPU x4 (works everywhere). Insert FlashVSR-CPU call here later if needed.
x4, log2 = upscale_x4_ffmpeg(playable)
logs.append("Upscale log:\n" + log2)
return playable, x4, "\n\n".join(logs)
with gr.Blocks(title="CPU Video Normalize + x4 (FFmpeg baseline)") as app:
gr.Markdown(
"""
# CPU Video Normalize + x4 (baseline)
- **Strictly CPU-only** runtime (CUDA disabled; TorchDynamo off).
- Step 1: Convert input to a widely playable MP4 (`yuv420p`).
- Step 2: x4 upscale with Lanczos (FFmpeg).
Integrate FlashVSR-CPU later between steps if desired.
"""
)
inp = gr.Video(label="Upload a video", sources=["upload"], format="mp4", height=240)
btn = gr.Button("Process (CPU)")
playable_out = gr.Video(label="Playable MP4")
x4_out = gr.Video(label="x4 Upscaled (CPU)")
log_out = gr.Textbox(label="Logs", lines=18)
def _go(video_file):
if not video_file:
raise gr.Error("Please upload a video.")
in_path = video_file if isinstance(video_file, str) else video_file.get("name")
playable, x4, logs = pipeline(in_path)
return playable, x4, logs
btn.click(_go, inputs=[inp], outputs=[playable_out, x4_out, log_out])
if __name__ == "__main__":
app.queue().launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))